1use crate::{
6 blob_cache::BlobCache,
7 gc::report::GcReport,
8 id::{IdGenerator, SegmentId},
9 index::Writer as IndexWriter,
10 manifest::{SegmentManifest, SEGMENTS_FOLDER, VLOG_MARKER},
11 path::absolute_path,
12 scanner::{Scanner, SizeMap},
13 segment::merge::MergeReader,
14 value::UserValue,
15 version::Version,
16 Compressor, Config, GcStrategy, IndexReader, SegmentReader, SegmentWriter, ValueHandle,
17};
18use std::{
19 fs::File,
20 io::{BufReader, Seek},
21 path::PathBuf,
22 sync::{atomic::AtomicU64, Arc, Mutex},
23};
24
25#[allow(clippy::module_name_repetitions)]
27pub type ValueLogId = u64;
28
29pub fn get_next_vlog_id() -> ValueLogId {
31 static VLOG_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
32 VLOG_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
33}
34
35#[derive(Clone)]
37pub struct ValueLog<C: Compressor + Clone>(Arc<ValueLogInner<C>>);
38
39impl<C: Compressor + Clone> std::ops::Deref for ValueLog<C> {
40 type Target = ValueLogInner<C>;
41
42 fn deref(&self) -> &Self::Target {
43 &self.0
44 }
45}
46
47#[allow(clippy::module_name_repetitions)]
48pub struct ValueLogInner<C: Compressor + Clone> {
49 id: u64,
51
52 pub path: PathBuf,
54
55 config: Config<C>,
57
58 blob_cache: Arc<BlobCache>,
60
61 #[doc(hidden)]
63 pub manifest: SegmentManifest<C>,
64
65 id_generator: IdGenerator,
67
68 #[doc(hidden)]
71 pub rollover_guard: Mutex<()>,
72}
73
74impl<C: Compressor + Clone> ValueLog<C> {
75 pub fn open<P: Into<PathBuf>>(
81 path: P, config: Config<C>,
83 ) -> crate::Result<Self> {
84 let path = path.into();
85
86 if path.join(VLOG_MARKER).try_exists()? {
87 Self::recover(path, config)
88 } else {
89 Self::create_new(path, config)
90 }
91 }
92
93 #[doc(hidden)]
114 pub fn verify(&self) -> crate::Result<usize> {
115 let _lock = self.rollover_guard.lock().expect("lock is poisoned");
116
117 let mut sum = 0;
118
119 for item in self.get_reader()? {
120 let (k, v, _, expected_checksum) = item?;
121
122 let mut hasher = xxhash_rust::xxh3::Xxh3::new();
123 hasher.update(&k);
124 hasher.update(&v);
125
126 if hasher.digest() != expected_checksum {
127 sum += 1;
128 }
129 }
130
131 Ok(sum)
132 }
133
134 pub(crate) fn create_new<P: Into<PathBuf>>(path: P, config: Config<C>) -> crate::Result<Self> {
136 let path = absolute_path(path.into());
137 log::trace!("Creating value-log at {}", path.display());
138
139 std::fs::create_dir_all(&path)?;
140
141 let marker_path = path.join(VLOG_MARKER);
142 assert!(!marker_path.try_exists()?);
143
144 std::fs::create_dir_all(path.join(SEGMENTS_FOLDER))?;
145
146 let mut file = std::fs::File::create(marker_path)?;
150 Version::V1.write_file_header(&mut file)?;
151 file.sync_all()?;
152
153 #[cfg(not(target_os = "windows"))]
154 {
155 let folder = std::fs::File::open(path.join(SEGMENTS_FOLDER))?;
158 folder.sync_all()?;
159
160 let folder = std::fs::File::open(&path)?;
161 folder.sync_all()?;
162 }
163
164 let blob_cache = config.blob_cache.clone();
165 let manifest = SegmentManifest::create_new(&path)?;
166
167 Ok(Self(Arc::new(ValueLogInner {
168 id: get_next_vlog_id(),
169 config,
170 path,
171 blob_cache,
172 manifest,
173 id_generator: IdGenerator::default(),
174 rollover_guard: Mutex::new(()),
175 })))
176 }
177
178 pub(crate) fn recover<P: Into<PathBuf>>(path: P, config: Config<C>) -> crate::Result<Self> {
179 let path = path.into();
180 log::info!("Recovering vLog at {}", path.display());
181
182 {
183 let bytes = std::fs::read(path.join(VLOG_MARKER))?;
184
185 if let Some(version) = Version::parse_file_header(&bytes) {
186 if version != Version::V1 {
187 return Err(crate::Error::InvalidVersion(Some(version)));
188 }
189 } else {
190 return Err(crate::Error::InvalidVersion(None));
191 }
192 }
193
194 let blob_cache = config.blob_cache.clone();
195 let manifest = SegmentManifest::recover(&path)?;
196
197 let highest_id = manifest
198 .segments
199 .read()
200 .expect("lock is poisoned")
201 .values()
202 .map(|x| x.id)
203 .max()
204 .unwrap_or_default();
205
206 Ok(Self(Arc::new(ValueLogInner {
207 id: get_next_vlog_id(),
208 config,
209 path,
210 blob_cache,
211 manifest,
212 id_generator: IdGenerator::new(highest_id + 1),
213 rollover_guard: Mutex::new(()),
214 })))
215 }
216
217 pub fn register_writer(&self, writer: SegmentWriter<C>) -> crate::Result<()> {
223 let _lock = self.rollover_guard.lock().expect("lock is poisoned");
224 self.manifest.register(writer)?;
225 Ok(())
226 }
227
228 #[must_use]
230 pub fn segment_count(&self) -> usize {
231 self.manifest.len()
232 }
233
234 pub fn get(&self, vhandle: &ValueHandle) -> crate::Result<Option<UserValue>> {
240 self.get_with_prefetch(vhandle, 0)
241 }
242
243 pub fn get_with_prefetch(
249 &self,
250 vhandle: &ValueHandle,
251 prefetch_size: usize,
252 ) -> crate::Result<Option<UserValue>> {
253 if let Some(value) = self.blob_cache.get(self.id, vhandle) {
254 return Ok(Some(value));
255 }
256
257 let Some(segment) = self.manifest.get_segment(vhandle.segment_id) else {
258 return Ok(None);
259 };
260
261 let mut reader = BufReader::new(File::open(&segment.path)?);
262 reader.seek(std::io::SeekFrom::Start(vhandle.offset))?;
263 let mut reader = SegmentReader::with_reader(vhandle.segment_id, reader)
264 .use_compression(self.config.compression.clone());
265
266 let Some(item) = reader.next() else {
267 return Ok(None);
268 };
269 let (_key, val, _checksum) = item?;
270
271 self.blob_cache
272 .insert((self.id, vhandle.clone()).into(), val.clone());
273
274 for _ in 0..prefetch_size {
279 let offset = reader.get_offset()?;
280
281 let Some(item) = reader.next() else {
282 break;
283 };
284 let (_key, val, _checksum) = item?;
285
286 let value_handle = ValueHandle {
287 segment_id: vhandle.segment_id,
288 offset,
289 };
290
291 self.blob_cache.insert((self.id, value_handle).into(), val);
292 }
293
294 Ok(Some(val))
295 }
296
297 fn get_writer_raw(&self) -> crate::Result<SegmentWriter<C>> {
298 SegmentWriter::new(
299 self.id_generator.clone(),
300 self.config.segment_size_bytes,
301 self.path.join(SEGMENTS_FOLDER),
302 )
303 .map_err(Into::into)
304 }
305
306 pub fn get_writer(&self) -> crate::Result<SegmentWriter<C>> {
312 self.get_writer_raw()
313 .map(|x| x.use_compression(self.config.compression.clone()))
314 }
315
316 pub fn drop_stale_segments(&self) -> crate::Result<u64> {
324 let _guard = self.rollover_guard.lock().expect("lock is poisoned");
326
327 let segments = self
328 .manifest
329 .segments
330 .read()
331 .expect("lock is poisoned")
332 .values()
333 .filter(|x| x.is_stale())
334 .cloned()
335 .collect::<Vec<_>>();
336
337 let bytes_freed = segments.iter().map(|x| x.meta.compressed_bytes).sum();
338
339 let ids = segments.iter().map(|x| x.id).collect::<Vec<_>>();
340
341 if ids.is_empty() {
342 log::trace!("No blob files to drop");
343 } else {
344 log::info!("Dropping stale blob files: {ids:?}");
345 self.manifest.drop_segments(&ids)?;
346
347 for segment in segments {
348 std::fs::remove_file(&segment.path)?;
349 }
350 }
351
352 Ok(bytes_freed)
353 }
354
355 fn mark_as_stale(&self, ids: &[SegmentId]) {
361 #[allow(clippy::significant_drop_tightening)]
363 let segments = self.manifest.segments.read().expect("lock is poisoned");
364
365 for id in ids {
366 let Some(segment) = segments.get(id) else {
367 continue;
368 };
369
370 segment.mark_as_stale();
371 }
372 }
373
374 #[must_use]
379 pub fn space_amp(&self) -> f32 {
380 self.manifest.space_amp()
381 }
382
383 #[doc(hidden)]
384 #[allow(clippy::cast_precision_loss)]
385 #[must_use]
386 pub fn consume_scan_result(&self, size_map: &SizeMap) -> GcReport {
387 let mut report = GcReport {
388 path: self.path.clone(),
389 segment_count: self.segment_count(),
390 stale_segment_count: 0,
391 stale_bytes: 0,
392 total_bytes: 0,
393 stale_blobs: 0,
394 total_blobs: 0,
395 };
396
397 for (&id, counter) in size_map {
398 let segment = self.manifest.get_segment(id).expect("segment should exist");
399
400 let total_bytes = segment.meta.total_uncompressed_bytes;
401 let total_items = segment.meta.item_count;
402
403 report.total_bytes += total_bytes;
404 report.total_blobs += total_items;
405
406 if counter.item_count > 0 {
407 let used_size = counter.size;
408 let alive_item_count = counter.item_count;
409
410 let segment = self.manifest.get_segment(id).expect("segment should exist");
411
412 let stale_bytes = total_bytes - used_size;
413 let stale_items = total_items - alive_item_count;
414
415 segment.gc_stats.set_stale_bytes(stale_bytes);
416 segment.gc_stats.set_stale_items(stale_items);
417
418 report.stale_bytes += stale_bytes;
419 report.stale_blobs += stale_items;
420 } else {
421 log::debug!(
422 "Blob file #{id} has no incoming references - can be dropped, freeing {} KiB on disk (userdata={} MiB)",
423 segment.meta.compressed_bytes / 1_024,
424 total_bytes / 1_024 / 1_024,
425 );
426 self.mark_as_stale(&[id]);
427
428 report.stale_segment_count += 1;
429 report.stale_bytes += total_bytes;
430 report.stale_blobs += total_items;
431 }
432 }
433
434 report
435 }
436
437 pub fn scan_for_stats(
443 &self,
444 iter: impl Iterator<Item = std::io::Result<(ValueHandle, u32)>>,
445 ) -> crate::Result<GcReport> {
446 let lock_guard = self.rollover_guard.lock().expect("lock is poisoned");
447
448 let ids = self.manifest.list_segment_ids();
449 let mut scanner = Scanner::new(iter, lock_guard, &ids);
450 scanner.scan()?;
451 let size_map = scanner.finish();
452 let report = self.consume_scan_result(&size_map);
453
454 Ok(report)
455 }
456
457 #[doc(hidden)]
458 pub fn get_reader(&self) -> crate::Result<MergeReader<C>> {
459 let segments = self.manifest.segments.read().expect("lock is poisoned");
460
461 let readers = segments
462 .values()
463 .map(|x| x.scan())
464 .collect::<crate::Result<Vec<_>>>()?;
465
466 Ok(MergeReader::new(readers))
467 }
468
469 #[doc(hidden)]
471 pub fn major_compact<R: IndexReader, W: IndexWriter>(
472 &self,
473 index_reader: &R,
474 index_writer: W,
475 ) -> crate::Result<u64> {
476 let ids = self.manifest.list_segment_ids();
477 self.rollover(&ids, index_reader, index_writer)
478 }
479
480 pub fn apply_gc_strategy<R: IndexReader, W: IndexWriter>(
486 &self,
487 strategy: &impl GcStrategy<C>,
488 index_reader: &R,
489 index_writer: W,
490 ) -> crate::Result<u64> {
491 let segment_ids = strategy.pick(self);
492 self.rollover(&segment_ids, index_reader, index_writer)
493 }
494
495 #[doc(hidden)]
504 pub fn rollover<R: IndexReader, W: IndexWriter>(
505 &self,
506 ids: &[u64],
507 index_reader: &R,
508 mut index_writer: W,
509 ) -> crate::Result<u64> {
510 if ids.is_empty() {
511 return Ok(0);
512 }
513
514 let _guard = self.rollover_guard.lock().expect("lock is poisoned");
516
517 let size_before = self.manifest.disk_space_used();
518
519 log::info!("Rollover segments {ids:?}");
520
521 let segments = ids
522 .iter()
523 .map(|&x| self.manifest.get_segment(x))
524 .collect::<Option<Vec<_>>>();
525
526 let Some(segments) = segments else {
527 return Ok(0);
528 };
529
530 let readers = segments
531 .into_iter()
532 .map(|x| x.scan())
533 .collect::<crate::Result<Vec<_>>>()?;
534
535 let reader = MergeReader::new(
540 readers
541 .into_iter()
542 .map(|x| x.use_compression(self.config.compression.clone()))
543 .collect(),
544 );
545
546 let mut writer = self
547 .get_writer_raw()?
548 .use_compression(self.config.compression.clone());
549
550 for item in reader {
551 let (k, v, segment_id, _) = item?;
552
553 match index_reader.get(&k)? {
554 Some(vhandle) if segment_id < vhandle.segment_id => continue,
556 None => continue,
557 _ => {}
558 }
559
560 let vhandle = writer.get_next_value_handle();
561
562 #[allow(clippy::cast_possible_truncation)]
564 index_writer.insert_indirect(&k, vhandle, v.len() as u32)?;
565
566 writer.write(&k, &v)?;
567 }
568
569 self.manifest.register(writer)?;
572
573 index_writer.finish()?;
576
577 self.mark_as_stale(ids);
581
582 let size_after = self.manifest.disk_space_used();
583
584 Ok(size_before.saturating_sub(size_after))
585 }
586}