1use crate::{
6 blob_cache::BlobCache,
7 gc::report::GcReport,
8 id::{IdGenerator, SegmentId},
9 index::Writer as IndexWriter,
10 manifest::{SegmentManifest, SEGMENTS_FOLDER, VLOG_MARKER},
11 path::absolute_path,
12 scanner::{Scanner, SizeMap},
13 segment::merge::MergeReader,
14 value::UserValue,
15 version::Version,
16 Compressor, Config, GcStrategy, IndexReader, SegmentReader, SegmentWriter, ValueHandle,
17};
18use std::{
19 fs::File,
20 io::{BufReader, Seek},
21 path::{Path, PathBuf},
22 sync::{atomic::AtomicU64, Arc, Mutex},
23};
24
25#[allow(clippy::module_name_repetitions)]
27pub type ValueLogId = u64;
28
29pub fn get_next_vlog_id() -> ValueLogId {
31 static VLOG_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
32 VLOG_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
33}
34
35fn unlink_blob_files(base_path: &Path, ids: &[SegmentId]) {
36 for id in ids {
37 let path = base_path.join(SEGMENTS_FOLDER).join(id.to_string());
38
39 if let Err(e) = std::fs::remove_file(&path) {
40 log::error!("Could not free blob file at {path:?}: {e:?}");
41 }
42 }
43}
44
45#[derive(Clone)]
47pub struct ValueLog<C: Compressor + Clone>(Arc<ValueLogInner<C>>);
48
49impl<C: Compressor + Clone> std::ops::Deref for ValueLog<C> {
50 type Target = ValueLogInner<C>;
51
52 fn deref(&self) -> &Self::Target {
53 &self.0
54 }
55}
56
57#[allow(clippy::module_name_repetitions)]
58pub struct ValueLogInner<C: Compressor + Clone> {
59 id: u64,
61
62 pub path: PathBuf,
64
65 config: Config<C>,
67
68 blob_cache: Arc<BlobCache>,
70
71 #[doc(hidden)]
73 pub manifest: SegmentManifest<C>,
74
75 id_generator: IdGenerator,
77
78 #[doc(hidden)]
81 pub rollover_guard: Mutex<()>,
82}
83
84impl<C: Compressor + Clone> ValueLog<C> {
85 pub fn open<P: Into<PathBuf>>(
91 path: P, config: Config<C>,
93 ) -> crate::Result<Self> {
94 let path = path.into();
95
96 if path.join(VLOG_MARKER).try_exists()? {
97 Self::recover(path, config)
98 } else {
99 Self::create_new(path, config)
100 }
101 }
102
103 #[doc(hidden)]
124 pub fn verify(&self) -> crate::Result<usize> {
125 let _lock = self.rollover_guard.lock().expect("lock is poisoned");
126
127 let mut sum = 0;
128
129 for item in self.get_reader()? {
130 let (k, v, _, expected_checksum) = item?;
131
132 let mut hasher = xxhash_rust::xxh3::Xxh3::new();
133 hasher.update(&k);
134 hasher.update(&v);
135
136 if hasher.digest() != expected_checksum {
137 sum += 1;
138 }
139 }
140
141 Ok(sum)
142 }
143
144 pub(crate) fn create_new<P: Into<PathBuf>>(path: P, config: Config<C>) -> crate::Result<Self> {
146 let path = absolute_path(path.into());
147 log::trace!("Creating value-log at {}", path.display());
148
149 std::fs::create_dir_all(&path)?;
150
151 let marker_path = path.join(VLOG_MARKER);
152 assert!(!marker_path.try_exists()?);
153
154 std::fs::create_dir_all(path.join(SEGMENTS_FOLDER))?;
155
156 let mut file = std::fs::File::create(marker_path)?;
160 Version::V1.write_file_header(&mut file)?;
161 file.sync_all()?;
162
163 #[cfg(not(target_os = "windows"))]
164 {
165 let folder = std::fs::File::open(path.join(SEGMENTS_FOLDER))?;
168 folder.sync_all()?;
169
170 let folder = std::fs::File::open(&path)?;
171 folder.sync_all()?;
172 }
173
174 let blob_cache = config.blob_cache.clone();
175 let manifest = SegmentManifest::create_new(&path)?;
176
177 Ok(Self(Arc::new(ValueLogInner {
178 id: get_next_vlog_id(),
179 config,
180 path,
181 blob_cache,
182 manifest,
183 id_generator: IdGenerator::default(),
184 rollover_guard: Mutex::new(()),
185 })))
186 }
187
188 pub(crate) fn recover<P: Into<PathBuf>>(path: P, config: Config<C>) -> crate::Result<Self> {
189 let path = path.into();
190 log::info!("Recovering vLog at {}", path.display());
191
192 {
193 let bytes = std::fs::read(path.join(VLOG_MARKER))?;
194
195 if let Some(version) = Version::parse_file_header(&bytes) {
196 if version != Version::V1 {
197 return Err(crate::Error::InvalidVersion(Some(version)));
198 }
199 } else {
200 return Err(crate::Error::InvalidVersion(None));
201 }
202 }
203
204 let blob_cache = config.blob_cache.clone();
205 let manifest = SegmentManifest::recover(&path)?;
206
207 let highest_id = manifest
208 .segments
209 .read()
210 .expect("lock is poisoned")
211 .values()
212 .map(|x| x.id)
213 .max()
214 .unwrap_or_default();
215
216 Ok(Self(Arc::new(ValueLogInner {
217 id: get_next_vlog_id(),
218 config,
219 path,
220 blob_cache,
221 manifest,
222 id_generator: IdGenerator::new(highest_id + 1),
223 rollover_guard: Mutex::new(()),
224 })))
225 }
226
227 pub fn register_writer(&self, writer: SegmentWriter<C>) -> crate::Result<()> {
233 let _lock = self.rollover_guard.lock().expect("lock is poisoned");
234 self.manifest.register(writer)?;
235 Ok(())
236 }
237
238 #[must_use]
240 pub fn segment_count(&self) -> usize {
241 self.manifest.len()
242 }
243
244 pub fn get(&self, vhandle: &ValueHandle) -> crate::Result<Option<UserValue>> {
250 self.get_with_prefetch(vhandle, 0)
251 }
252
253 pub fn get_with_prefetch(
259 &self,
260 vhandle: &ValueHandle,
261 prefetch_size: usize,
262 ) -> crate::Result<Option<UserValue>> {
263 if let Some(value) = self.blob_cache.get(self.id, vhandle) {
264 return Ok(Some(value));
265 }
266
267 let Some(segment) = self.manifest.get_segment(vhandle.segment_id) else {
268 return Ok(None);
269 };
270
271 let mut reader = BufReader::new(File::open(&segment.path)?);
272 reader.seek(std::io::SeekFrom::Start(vhandle.offset))?;
273 let mut reader = SegmentReader::with_reader(vhandle.segment_id, reader)
274 .use_compression(self.config.compression.clone());
275
276 let Some(item) = reader.next() else {
277 return Ok(None);
278 };
279 let (_key, val, _checksum) = item?;
280
281 self.blob_cache
282 .insert((self.id, vhandle.clone()).into(), val.clone());
283
284 for _ in 0..prefetch_size {
289 let offset = reader.get_offset()?;
290
291 let Some(item) = reader.next() else {
292 break;
293 };
294 let (_key, val, _checksum) = item?;
295
296 let value_handle = ValueHandle {
297 segment_id: vhandle.segment_id,
298 offset,
299 };
300
301 self.blob_cache.insert((self.id, value_handle).into(), val);
302 }
303
304 Ok(Some(val))
305 }
306
307 fn get_writer_raw(&self) -> crate::Result<SegmentWriter<C>> {
308 SegmentWriter::new(
309 self.id_generator.clone(),
310 self.config.segment_size_bytes,
311 self.path.join(SEGMENTS_FOLDER),
312 )
313 .map_err(Into::into)
314 }
315
316 pub fn get_writer(&self) -> crate::Result<SegmentWriter<C>> {
322 self.get_writer_raw()
323 .map(|x| x.use_compression(self.config.compression.clone()))
324 }
325
326 pub fn drop_stale_segments(&self) -> crate::Result<u64> {
334 let _guard = self.rollover_guard.lock().expect("lock is poisoned");
336
337 let segments = self
338 .manifest
339 .segments
340 .read()
341 .expect("lock is poisoned")
342 .values()
343 .filter(|x| x.is_stale())
344 .cloned()
345 .collect::<Vec<_>>();
346
347 let bytes_freed = segments.iter().map(|x| x.meta.compressed_bytes).sum();
348
349 let ids = segments.iter().map(|x| x.id).collect::<Vec<_>>();
350
351 if ids.is_empty() {
352 log::trace!("No blob files to drop");
353 } else {
354 log::info!("Dropping stale blob files: {ids:?}");
355 self.manifest.drop_segments(&ids)?;
356
357 for segment in segments {
358 std::fs::remove_file(&segment.path)?;
359 }
360 }
361
362 Ok(bytes_freed)
363 }
364
365 fn mark_as_stale(&self, ids: &[SegmentId]) {
371 #[allow(clippy::significant_drop_tightening)]
373 let segments = self.manifest.segments.read().expect("lock is poisoned");
374
375 for id in ids {
376 let Some(segment) = segments.get(id) else {
377 continue;
378 };
379
380 segment.mark_as_stale();
381 }
382 }
383
384 #[must_use]
389 pub fn space_amp(&self) -> f32 {
390 self.manifest.space_amp()
391 }
392
393 #[doc(hidden)]
394 #[allow(clippy::cast_precision_loss)]
395 #[must_use]
396 pub fn consume_scan_result(&self, size_map: &SizeMap) -> GcReport {
397 let mut report = GcReport {
398 path: self.path.clone(),
399 segment_count: self.segment_count(),
400 stale_segment_count: 0,
401 stale_bytes: 0,
402 total_bytes: 0,
403 stale_blobs: 0,
404 total_blobs: 0,
405 };
406
407 for (&id, counter) in size_map {
408 let segment = self.manifest.get_segment(id).expect("segment should exist");
409
410 let total_bytes = segment.meta.total_uncompressed_bytes;
411 let total_items = segment.meta.item_count;
412
413 report.total_bytes += total_bytes;
414 report.total_blobs += total_items;
415
416 if counter.item_count > 0 {
417 let used_size = counter.size;
418 let alive_item_count = counter.item_count;
419
420 let segment = self.manifest.get_segment(id).expect("segment should exist");
421
422 let stale_bytes = total_bytes - used_size;
423 let stale_items = total_items - alive_item_count;
424
425 segment.gc_stats.set_stale_bytes(stale_bytes);
426 segment.gc_stats.set_stale_items(stale_items);
427
428 report.stale_bytes += stale_bytes;
429 report.stale_blobs += stale_items;
430 } else {
431 log::debug!(
432 "Blob file #{id} has no incoming references - can be dropped, freeing {} KiB on disk (userdata={} MiB)",
433 segment.meta.compressed_bytes / 1_024,
434 total_bytes / 1_024 / 1_024,
435 );
436 self.mark_as_stale(&[id]);
437
438 report.stale_segment_count += 1;
439 report.stale_bytes += total_bytes;
440 report.stale_blobs += total_items;
441 }
442 }
443
444 report
445 }
446
447 #[allow(clippy::significant_drop_tightening)]
453 pub fn scan_for_stats(
454 &self,
455 iter: impl Iterator<Item = std::io::Result<(ValueHandle, u32)>>,
456 ) -> crate::Result<GcReport> {
457 let lock_guard = self.rollover_guard.lock().expect("lock is poisoned");
458
459 let ids = self.manifest.list_segment_ids();
460
461 let mut scanner = Scanner::new(iter, lock_guard, &ids);
462 scanner.scan()?;
463 let size_map = scanner.finish();
464 let report = self.consume_scan_result(&size_map);
465
466 Ok(report)
467 }
468
469 #[doc(hidden)]
470 pub fn get_reader(&self) -> crate::Result<MergeReader<C>> {
471 let readers = self
472 .manifest
473 .segments
474 .read()
475 .expect("lock is poisoned")
476 .values()
477 .map(|x| x.scan())
478 .collect::<crate::Result<Vec<_>>>()?;
479
480 Ok(MergeReader::new(readers))
481 }
482
483 #[doc(hidden)]
485 pub fn major_compact<R: IndexReader, W: IndexWriter>(
486 &self,
487 index_reader: &R,
488 index_writer: W,
489 ) -> crate::Result<u64> {
490 let ids = self.manifest.list_segment_ids();
491 self.rollover(&ids, index_reader, index_writer)
492 }
493
494 pub fn apply_gc_strategy<R: IndexReader, W: IndexWriter>(
500 &self,
501 strategy: &impl GcStrategy<C>,
502 index_reader: &R,
503 index_writer: W,
504 ) -> crate::Result<u64> {
505 let segment_ids = strategy.pick(self);
506 self.rollover(&segment_ids, index_reader, index_writer)
507 }
508
509 pub fn clear(&self, prune_async: bool) -> crate::Result<()> {
513 let guard = self.rollover_guard.lock().expect("lock is poisoned");
514 let ids = self.manifest.list_segment_ids();
515 self.manifest.clear()?;
516 drop(guard);
517
518 if prune_async {
519 let path = self.path.clone();
520
521 std::thread::spawn(move || {
522 log::trace!("Pruning dropped blob files in thread: {ids:?}");
523 unlink_blob_files(&path, &ids);
524 log::trace!("Successfully pruned all blob files");
525 });
526 } else {
527 log::trace!("Pruning dropped blob files: {ids:?}");
528 unlink_blob_files(&self.path, &ids);
529 log::trace!("Successfully pruned all blob files");
530 }
531
532 Ok(())
533 }
534
535 #[doc(hidden)]
544 pub fn rollover<R: IndexReader, W: IndexWriter>(
545 &self,
546 ids: &[u64],
547 index_reader: &R,
548 mut index_writer: W,
549 ) -> crate::Result<u64> {
550 if ids.is_empty() {
551 return Ok(0);
552 }
553
554 let _guard = self.rollover_guard.lock().expect("lock is poisoned");
556
557 let size_before = self.manifest.disk_space_used();
558
559 log::info!("Rollover segments {ids:?}");
560
561 let segments = ids
562 .iter()
563 .map(|&x| self.manifest.get_segment(x))
564 .collect::<Option<Vec<_>>>();
565
566 let Some(segments) = segments else {
567 return Ok(0);
568 };
569
570 let readers = segments
571 .into_iter()
572 .map(|x| x.scan())
573 .collect::<crate::Result<Vec<_>>>()?;
574
575 let reader = MergeReader::new(
580 readers
581 .into_iter()
582 .map(|x| x.use_compression(self.config.compression.clone()))
583 .collect(),
584 );
585
586 let mut writer = self
587 .get_writer_raw()?
588 .use_compression(self.config.compression.clone());
589
590 for item in reader {
591 let (k, v, segment_id, _) = item?;
592
593 match index_reader.get(&k)? {
594 Some(vhandle) if segment_id < vhandle.segment_id => continue,
596 None => continue,
597 _ => {}
598 }
599
600 let vhandle = writer.get_next_value_handle();
601
602 #[allow(clippy::cast_possible_truncation)]
604 index_writer.insert_indirect(&k, vhandle, v.len() as u32)?;
605
606 writer.write(&k, &v)?;
607 }
608
609 self.manifest.register(writer)?;
612
613 index_writer.finish()?;
616
617 self.mark_as_stale(ids);
621
622 let size_after = self.manifest.disk_space_used();
623
624 Ok(size_before.saturating_sub(size_after))
625 }
626}