1use crate::{
6 gc::report::GcReport,
7 id::{IdGenerator, SegmentId},
8 index::Writer as IndexWriter,
9 manifest::{SegmentManifest, SEGMENTS_FOLDER, VLOG_MARKER},
10 path::absolute_path,
11 scanner::{Scanner, SizeMap},
12 segment::merge::MergeReader,
13 version::Version,
14 BlobCache, Compressor, Config, GcStrategy, IndexReader, SegmentReader, SegmentWriter,
15 UserValue, ValueHandle,
16};
17use std::{
18 fs::File,
19 io::{BufReader, Seek},
20 path::{Path, PathBuf},
21 sync::{atomic::AtomicU64, Arc, Mutex},
22};
23
24#[allow(clippy::module_name_repetitions)]
26pub type ValueLogId = u64;
27
28pub fn get_next_vlog_id() -> ValueLogId {
30 static VLOG_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
31 VLOG_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
32}
33
34fn unlink_blob_files(base_path: &Path, ids: &[SegmentId]) {
35 for id in ids {
36 let path = base_path.join(SEGMENTS_FOLDER).join(id.to_string());
37
38 if let Err(e) = std::fs::remove_file(&path) {
39 log::error!("Could not free blob file at {path:?}: {e:?}");
40 }
41 }
42}
43
44#[derive(Clone)]
46pub struct ValueLog<BC: BlobCache, C: Compressor + Clone>(Arc<ValueLogInner<BC, C>>);
47
48impl<BC: BlobCache, C: Compressor + Clone> std::ops::Deref for ValueLog<BC, C> {
49 type Target = ValueLogInner<BC, C>;
50
51 fn deref(&self) -> &Self::Target {
52 &self.0
53 }
54}
55
56#[allow(clippy::module_name_repetitions)]
57pub struct ValueLogInner<BC: BlobCache, C: Compressor + Clone> {
58 id: u64,
60
61 pub path: PathBuf,
63
64 config: Config<BC, C>,
66
67 blob_cache: BC,
69
70 #[doc(hidden)]
72 pub manifest: SegmentManifest<C>,
73
74 id_generator: IdGenerator,
76
77 #[doc(hidden)]
80 pub rollover_guard: Mutex<()>,
81}
82
83impl<BC: BlobCache, C: Compressor + Clone> ValueLog<BC, C> {
84 pub fn open<P: Into<PathBuf>>(
90 path: P, config: Config<BC, C>,
92 ) -> crate::Result<Self> {
93 let path = path.into();
94
95 if path.join(VLOG_MARKER).try_exists()? {
96 Self::recover(path, config)
97 } else {
98 Self::create_new(path, config)
99 }
100 }
101
102 #[doc(hidden)]
123 pub fn verify(&self) -> crate::Result<usize> {
124 let _lock = self.rollover_guard.lock().expect("lock is poisoned");
125
126 let mut sum = 0;
127
128 for item in self.get_reader()? {
129 let (k, v, _, expected_checksum) = item?;
130
131 let mut hasher = xxhash_rust::xxh3::Xxh3::new();
132 hasher.update(&k);
133 hasher.update(&v);
134
135 if hasher.digest() != expected_checksum {
136 sum += 1;
137 }
138 }
139
140 Ok(sum)
141 }
142
143 pub(crate) fn create_new<P: Into<PathBuf>>(
145 path: P,
146 config: Config<BC, C>,
147 ) -> crate::Result<Self> {
148 let path = absolute_path(path.into());
149 log::trace!("Creating value-log at {}", path.display());
150
151 std::fs::create_dir_all(&path)?;
152
153 let marker_path = path.join(VLOG_MARKER);
154 assert!(!marker_path.try_exists()?);
155
156 std::fs::create_dir_all(path.join(SEGMENTS_FOLDER))?;
157
158 let mut file = std::fs::File::create(marker_path)?;
162 Version::V1.write_file_header(&mut file)?;
163 file.sync_all()?;
164
165 #[cfg(not(target_os = "windows"))]
166 {
167 let folder = std::fs::File::open(path.join(SEGMENTS_FOLDER))?;
170 folder.sync_all()?;
171
172 let folder = std::fs::File::open(&path)?;
173 folder.sync_all()?;
174 }
175
176 let blob_cache = config.blob_cache.clone();
177 let manifest = SegmentManifest::create_new(&path)?;
178
179 Ok(Self(Arc::new(ValueLogInner {
180 id: get_next_vlog_id(),
181 config,
182 path,
183 blob_cache,
184 manifest,
185 id_generator: IdGenerator::default(),
186 rollover_guard: Mutex::new(()),
187 })))
188 }
189
190 pub(crate) fn recover<P: Into<PathBuf>>(path: P, config: Config<BC, C>) -> crate::Result<Self> {
191 let path = path.into();
192 log::info!("Recovering vLog at {}", path.display());
193
194 {
195 let bytes = std::fs::read(path.join(VLOG_MARKER))?;
196
197 if let Some(version) = Version::parse_file_header(&bytes) {
198 if version != Version::V1 {
199 return Err(crate::Error::InvalidVersion(Some(version)));
200 }
201 } else {
202 return Err(crate::Error::InvalidVersion(None));
203 }
204 }
205
206 let blob_cache = config.blob_cache.clone();
207 let manifest = SegmentManifest::recover(&path)?;
208
209 let highest_id = manifest
210 .segments
211 .read()
212 .expect("lock is poisoned")
213 .values()
214 .map(|x| x.id)
215 .max()
216 .unwrap_or_default();
217
218 Ok(Self(Arc::new(ValueLogInner {
219 id: get_next_vlog_id(),
220 config,
221 path,
222 blob_cache,
223 manifest,
224 id_generator: IdGenerator::new(highest_id + 1),
225 rollover_guard: Mutex::new(()),
226 })))
227 }
228
229 pub fn register_writer(&self, writer: SegmentWriter<C>) -> crate::Result<()> {
235 let _lock = self.rollover_guard.lock().expect("lock is poisoned");
236 self.manifest.register(writer)?;
237 Ok(())
238 }
239
240 #[must_use]
242 pub fn segment_count(&self) -> usize {
243 self.manifest.len()
244 }
245
246 pub fn get(&self, vhandle: &ValueHandle) -> crate::Result<Option<UserValue>> {
252 self.get_with_prefetch(vhandle, 0)
253 }
254
255 pub fn get_with_prefetch(
261 &self,
262 vhandle: &ValueHandle,
263 prefetch_size: usize,
264 ) -> crate::Result<Option<UserValue>> {
265 if let Some(value) = self.blob_cache.get(self.id, vhandle) {
266 return Ok(Some(value));
267 }
268
269 let Some(segment) = self.manifest.get_segment(vhandle.segment_id) else {
270 return Ok(None);
271 };
272
273 let mut reader = BufReader::new(File::open(&segment.path)?);
274 reader.seek(std::io::SeekFrom::Start(vhandle.offset))?;
275 let mut reader = SegmentReader::with_reader(vhandle.segment_id, reader)
276 .use_compression(self.config.compression.clone());
277
278 let Some(item) = reader.next() else {
279 return Ok(None);
280 };
281 let (_key, val, _checksum) = item?;
282
283 self.blob_cache.insert(self.id, vhandle, val.clone());
284
285 for _ in 0..prefetch_size {
290 let offset = reader.get_offset()?;
291
292 let Some(item) = reader.next() else {
293 break;
294 };
295 let (_key, val, _checksum) = item?;
296
297 let value_handle = ValueHandle {
298 segment_id: vhandle.segment_id,
299 offset,
300 };
301
302 self.blob_cache.insert(self.id, &value_handle, val);
303 }
304
305 Ok(Some(val))
306 }
307
308 fn get_writer_raw(&self) -> crate::Result<SegmentWriter<C>> {
309 SegmentWriter::new(
310 self.id_generator.clone(),
311 self.config.segment_size_bytes,
312 self.path.join(SEGMENTS_FOLDER),
313 )
314 .map_err(Into::into)
315 }
316
317 pub fn get_writer(&self) -> crate::Result<SegmentWriter<C>> {
323 self.get_writer_raw()
324 .map(|x| x.use_compression(self.config.compression.clone()))
325 }
326
327 pub fn drop_stale_segments(&self) -> crate::Result<u64> {
335 let _guard = self.rollover_guard.lock().expect("lock is poisoned");
337
338 let segments = self
339 .manifest
340 .segments
341 .read()
342 .expect("lock is poisoned")
343 .values()
344 .filter(|x| x.is_stale())
345 .cloned()
346 .collect::<Vec<_>>();
347
348 let bytes_freed = segments.iter().map(|x| x.meta.compressed_bytes).sum();
349
350 let ids = segments.iter().map(|x| x.id).collect::<Vec<_>>();
351
352 if ids.is_empty() {
353 log::trace!("No blob files to drop");
354 } else {
355 log::info!("Dropping stale blob files: {ids:?}");
356 self.manifest.drop_segments(&ids)?;
357
358 for segment in segments {
359 std::fs::remove_file(&segment.path)?;
360 }
361 }
362
363 Ok(bytes_freed)
364 }
365
366 fn mark_as_stale(&self, ids: &[SegmentId]) {
372 #[allow(clippy::significant_drop_tightening)]
374 let segments = self.manifest.segments.read().expect("lock is poisoned");
375
376 for id in ids {
377 let Some(segment) = segments.get(id) else {
378 continue;
379 };
380
381 segment.mark_as_stale();
382 }
383 }
384
385 #[must_use]
390 pub fn space_amp(&self) -> f32 {
391 self.manifest.space_amp()
392 }
393
394 #[doc(hidden)]
395 #[allow(clippy::cast_precision_loss)]
396 #[must_use]
397 pub fn consume_scan_result(&self, size_map: &SizeMap) -> GcReport {
398 let mut report = GcReport {
399 path: self.path.clone(),
400 segment_count: self.segment_count(),
401 stale_segment_count: 0,
402 stale_bytes: 0,
403 total_bytes: 0,
404 stale_blobs: 0,
405 total_blobs: 0,
406 };
407
408 for (&id, counter) in size_map {
409 let segment = self.manifest.get_segment(id).expect("segment should exist");
410
411 let total_bytes = segment.meta.total_uncompressed_bytes;
412 let total_items = segment.meta.item_count;
413
414 report.total_bytes += total_bytes;
415 report.total_blobs += total_items;
416
417 if counter.item_count > 0 {
418 let used_size = counter.size;
419 let alive_item_count = counter.item_count;
420
421 let segment = self.manifest.get_segment(id).expect("segment should exist");
422
423 let stale_bytes = total_bytes - used_size;
424 let stale_items = total_items - alive_item_count;
425
426 segment.gc_stats.set_stale_bytes(stale_bytes);
427 segment.gc_stats.set_stale_items(stale_items);
428
429 report.stale_bytes += stale_bytes;
430 report.stale_blobs += stale_items;
431 } else {
432 log::debug!(
433 "Blob file #{id} has no incoming references - can be dropped, freeing {} KiB on disk (userdata={} MiB)",
434 segment.meta.compressed_bytes / 1_024,
435 total_bytes / 1_024 / 1_024,
436 );
437 self.mark_as_stale(&[id]);
438
439 report.stale_segment_count += 1;
440 report.stale_bytes += total_bytes;
441 report.stale_blobs += total_items;
442 }
443 }
444
445 report
446 }
447
448 #[allow(clippy::significant_drop_tightening)]
454 pub fn scan_for_stats(
455 &self,
456 iter: impl Iterator<Item = std::io::Result<(ValueHandle, u32)>>,
457 ) -> crate::Result<GcReport> {
458 let lock_guard = self.rollover_guard.lock().expect("lock is poisoned");
459
460 let ids = self.manifest.list_segment_ids();
461
462 let mut scanner = Scanner::new(iter, lock_guard, &ids);
463 scanner.scan()?;
464 let size_map = scanner.finish();
465 let report = self.consume_scan_result(&size_map);
466
467 Ok(report)
468 }
469
470 #[doc(hidden)]
471 pub fn get_reader(&self) -> crate::Result<MergeReader<C>> {
472 let readers = self
473 .manifest
474 .segments
475 .read()
476 .expect("lock is poisoned")
477 .values()
478 .map(|x| x.scan())
479 .collect::<crate::Result<Vec<_>>>()?;
480
481 Ok(MergeReader::new(readers))
482 }
483
484 #[doc(hidden)]
486 pub fn major_compact<R: IndexReader, W: IndexWriter>(
487 &self,
488 index_reader: &R,
489 index_writer: W,
490 ) -> crate::Result<u64> {
491 let ids = self.manifest.list_segment_ids();
492 self.rollover(&ids, index_reader, index_writer)
493 }
494
495 pub fn apply_gc_strategy<R: IndexReader, W: IndexWriter>(
501 &self,
502 strategy: &impl GcStrategy<BC, C>,
503 index_reader: &R,
504 index_writer: W,
505 ) -> crate::Result<u64> {
506 let segment_ids = strategy.pick(self);
507 self.rollover(&segment_ids, index_reader, index_writer)
508 }
509
510 pub fn clear(&self, prune_async: bool) -> crate::Result<()> {
514 let guard = self.rollover_guard.lock().expect("lock is poisoned");
515 let ids = self.manifest.list_segment_ids();
516 self.manifest.clear()?;
517 drop(guard);
518
519 if prune_async {
520 let path = self.path.clone();
521
522 std::thread::spawn(move || {
523 log::trace!("Pruning dropped blob files in thread: {ids:?}");
524 unlink_blob_files(&path, &ids);
525 log::trace!("Successfully pruned all blob files");
526 });
527 } else {
528 log::trace!("Pruning dropped blob files: {ids:?}");
529 unlink_blob_files(&self.path, &ids);
530 log::trace!("Successfully pruned all blob files");
531 }
532
533 Ok(())
534 }
535
536 #[doc(hidden)]
545 pub fn rollover<R: IndexReader, W: IndexWriter>(
546 &self,
547 ids: &[u64],
548 index_reader: &R,
549 mut index_writer: W,
550 ) -> crate::Result<u64> {
551 if ids.is_empty() {
552 return Ok(0);
553 }
554
555 let _guard = self.rollover_guard.lock().expect("lock is poisoned");
557
558 let size_before = self.manifest.disk_space_used();
559
560 log::info!("Rollover segments {ids:?}");
561
562 let segments = ids
563 .iter()
564 .map(|&x| self.manifest.get_segment(x))
565 .collect::<Option<Vec<_>>>();
566
567 let Some(segments) = segments else {
568 return Ok(0);
569 };
570
571 let readers = segments
572 .into_iter()
573 .map(|x| x.scan())
574 .collect::<crate::Result<Vec<_>>>()?;
575
576 let reader = MergeReader::new(
581 readers
582 .into_iter()
583 .map(|x| x.use_compression(self.config.compression.clone()))
584 .collect(),
585 );
586
587 let mut writer = self
588 .get_writer_raw()?
589 .use_compression(self.config.compression.clone());
590
591 for item in reader {
592 let (k, v, segment_id, _) = item?;
593
594 match index_reader.get(&k)? {
595 Some(vhandle) if segment_id < vhandle.segment_id => continue,
597 None => continue,
598 _ => {}
599 }
600
601 let vhandle = writer.get_next_value_handle();
602
603 #[allow(clippy::cast_possible_truncation)]
605 index_writer.insert_indirect(&k, vhandle, v.len() as u32)?;
606
607 writer.write(&k, &v)?;
608 }
609
610 self.manifest.register(writer)?;
613
614 index_writer.finish()?;
617
618 self.mark_as_stale(ids);
622
623 let size_after = self.manifest.disk_space_used();
624
625 Ok(size_before.saturating_sub(size_after))
626 }
627}