value_log/
value_log.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::{
6    gc::report::GcReport,
7    id::{IdGenerator, SegmentId},
8    index::Writer as IndexWriter,
9    manifest::{SegmentManifest, SEGMENTS_FOLDER, VLOG_MARKER},
10    path::absolute_path,
11    scanner::{Scanner, SizeMap},
12    segment::merge::MergeReader,
13    version::Version,
14    BlobCache, Compressor, Config, GcStrategy, IndexReader, SegmentReader, SegmentWriter,
15    UserValue, ValueHandle,
16};
17use std::{
18    fs::File,
19    io::{BufReader, Seek},
20    path::{Path, PathBuf},
21    sync::{atomic::AtomicU64, Arc, Mutex},
22};
23
24/// Unique value log ID
25#[allow(clippy::module_name_repetitions)]
26pub type ValueLogId = u64;
27
28/// Hands out a unique (monotonically increasing) value log ID.
29pub fn get_next_vlog_id() -> ValueLogId {
30    static VLOG_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
31    VLOG_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
32}
33
34fn unlink_blob_files(base_path: &Path, ids: &[SegmentId]) {
35    for id in ids {
36        let path = base_path.join(SEGMENTS_FOLDER).join(id.to_string());
37
38        if let Err(e) = std::fs::remove_file(&path) {
39            log::error!("Could not free blob file at {path:?}: {e:?}");
40        }
41    }
42}
43
44/// A disk-resident value log
45#[derive(Clone)]
46pub struct ValueLog<BC: BlobCache, C: Compressor + Clone>(Arc<ValueLogInner<BC, C>>);
47
48impl<BC: BlobCache, C: Compressor + Clone> std::ops::Deref for ValueLog<BC, C> {
49    type Target = ValueLogInner<BC, C>;
50
51    fn deref(&self) -> &Self::Target {
52        &self.0
53    }
54}
55
56#[allow(clippy::module_name_repetitions)]
57pub struct ValueLogInner<BC: BlobCache, C: Compressor + Clone> {
58    /// Unique value log ID
59    id: u64,
60
61    /// Base folder
62    pub path: PathBuf,
63
64    /// Value log configuration
65    config: Config<BC, C>,
66
67    /// In-memory blob cache
68    blob_cache: BC,
69
70    /// Segment manifest
71    #[doc(hidden)]
72    pub manifest: SegmentManifest<C>,
73
74    /// Generator to get next segment ID
75    id_generator: IdGenerator,
76
77    /// Guards the rollover (compaction) process to only
78    /// allow one to happen at a time
79    #[doc(hidden)]
80    pub rollover_guard: Mutex<()>,
81}
82
83impl<BC: BlobCache, C: Compressor + Clone> ValueLog<BC, C> {
84    /// Creates or recovers a value log in the given directory.
85    ///
86    /// # Errors
87    ///
88    /// Will return `Err` if an IO error occurs.
89    pub fn open<P: Into<PathBuf>>(
90        path: P, // TODO: move path into config?
91        config: Config<BC, C>,
92    ) -> crate::Result<Self> {
93        let path = path.into();
94
95        if path.join(VLOG_MARKER).try_exists()? {
96            Self::recover(path, config)
97        } else {
98            Self::create_new(path, config)
99        }
100    }
101
102    /* /// Prints fragmentation histogram.
103    pub fn print_fragmentation_histogram(&self) {
104        let lock = self.manifest.segments.read().expect("lock is poisoned");
105
106        for (id, segment) in &*lock {
107            let stale_ratio = segment.stale_ratio();
108
109            let progress = (stale_ratio * 10.0) as usize;
110            let void = 10 - progress;
111
112            let progress = "=".repeat(progress);
113            let void = " ".repeat(void);
114
115            println!(
116                "{id:0>4} [{progress}{void}] {}%",
117                (stale_ratio * 100.0) as usize
118            );
119        }
120    } */
121
122    #[doc(hidden)]
123    pub fn verify(&self) -> crate::Result<usize> {
124        let _lock = self.rollover_guard.lock().expect("lock is poisoned");
125
126        let mut sum = 0;
127
128        for item in self.get_reader()? {
129            let (k, v, _, expected_checksum) = item?;
130
131            let mut hasher = xxhash_rust::xxh3::Xxh3::new();
132            hasher.update(&k);
133            hasher.update(&v);
134
135            if hasher.digest() != expected_checksum {
136                sum += 1;
137            }
138        }
139
140        Ok(sum)
141    }
142
143    /// Creates a new empty value log in a directory.
144    pub(crate) fn create_new<P: Into<PathBuf>>(
145        path: P,
146        config: Config<BC, C>,
147    ) -> crate::Result<Self> {
148        let path = absolute_path(path.into());
149        log::trace!("Creating value-log at {}", path.display());
150
151        std::fs::create_dir_all(&path)?;
152
153        let marker_path = path.join(VLOG_MARKER);
154        assert!(!marker_path.try_exists()?);
155
156        std::fs::create_dir_all(path.join(SEGMENTS_FOLDER))?;
157
158        // NOTE: Lastly, fsync .vlog marker, which contains the version
159        // -> the V-log is fully initialized
160
161        let mut file = std::fs::File::create(marker_path)?;
162        Version::V1.write_file_header(&mut file)?;
163        file.sync_all()?;
164
165        #[cfg(not(target_os = "windows"))]
166        {
167            // fsync folders on Unix
168
169            let folder = std::fs::File::open(path.join(SEGMENTS_FOLDER))?;
170            folder.sync_all()?;
171
172            let folder = std::fs::File::open(&path)?;
173            folder.sync_all()?;
174        }
175
176        let blob_cache = config.blob_cache.clone();
177        let manifest = SegmentManifest::create_new(&path)?;
178
179        Ok(Self(Arc::new(ValueLogInner {
180            id: get_next_vlog_id(),
181            config,
182            path,
183            blob_cache,
184            manifest,
185            id_generator: IdGenerator::default(),
186            rollover_guard: Mutex::new(()),
187        })))
188    }
189
190    pub(crate) fn recover<P: Into<PathBuf>>(path: P, config: Config<BC, C>) -> crate::Result<Self> {
191        let path = path.into();
192        log::info!("Recovering vLog at {}", path.display());
193
194        {
195            let bytes = std::fs::read(path.join(VLOG_MARKER))?;
196
197            if let Some(version) = Version::parse_file_header(&bytes) {
198                if version != Version::V1 {
199                    return Err(crate::Error::InvalidVersion(Some(version)));
200                }
201            } else {
202                return Err(crate::Error::InvalidVersion(None));
203            }
204        }
205
206        let blob_cache = config.blob_cache.clone();
207        let manifest = SegmentManifest::recover(&path)?;
208
209        let highest_id = manifest
210            .segments
211            .read()
212            .expect("lock is poisoned")
213            .values()
214            .map(|x| x.id)
215            .max()
216            .unwrap_or_default();
217
218        Ok(Self(Arc::new(ValueLogInner {
219            id: get_next_vlog_id(),
220            config,
221            path,
222            blob_cache,
223            manifest,
224            id_generator: IdGenerator::new(highest_id + 1),
225            rollover_guard: Mutex::new(()),
226        })))
227    }
228
229    /// Registers a [`SegmentWriter`].
230    ///
231    /// # Errors
232    ///
233    /// Will return `Err` if an IO error occurs.
234    pub fn register_writer(&self, writer: SegmentWriter<C>) -> crate::Result<()> {
235        let _lock = self.rollover_guard.lock().expect("lock is poisoned");
236        self.manifest.register(writer)?;
237        Ok(())
238    }
239
240    /// Returns the amount of segments in the value log.
241    #[must_use]
242    pub fn segment_count(&self) -> usize {
243        self.manifest.len()
244    }
245
246    /// Resolves a value handle.
247    ///
248    /// # Errors
249    ///
250    /// Will return `Err` if an IO error occurs.
251    pub fn get(&self, vhandle: &ValueHandle) -> crate::Result<Option<UserValue>> {
252        self.get_with_prefetch(vhandle, 0)
253    }
254
255    /// Resolves a value handle, and prefetches some values after it.
256    ///
257    /// # Errors
258    ///
259    /// Will return `Err` if an IO error occurs.
260    pub fn get_with_prefetch(
261        &self,
262        vhandle: &ValueHandle,
263        prefetch_size: usize,
264    ) -> crate::Result<Option<UserValue>> {
265        if let Some(value) = self.blob_cache.get(self.id, vhandle) {
266            return Ok(Some(value));
267        }
268
269        let Some(segment) = self.manifest.get_segment(vhandle.segment_id) else {
270            return Ok(None);
271        };
272
273        let mut reader = BufReader::new(File::open(&segment.path)?);
274        reader.seek(std::io::SeekFrom::Start(vhandle.offset))?;
275        let mut reader = SegmentReader::with_reader(vhandle.segment_id, reader)
276            .use_compression(self.config.compression.clone());
277
278        let Some(item) = reader.next() else {
279            return Ok(None);
280        };
281        let (_key, val, _checksum) = item?;
282
283        self.blob_cache.insert(self.id, vhandle, val.clone());
284
285        // TODO: maybe we can look at the value size and prefetch some more values
286        // without causing another I/O...
287        // TODO: benchmark range reads for rather small non-inlined blobs (maybe ~512-1000B)
288        // and see how different BufReader capacities and prefetch changes range read performance
289        for _ in 0..prefetch_size {
290            let offset = reader.get_offset()?;
291
292            let Some(item) = reader.next() else {
293                break;
294            };
295            let (_key, val, _checksum) = item?;
296
297            let value_handle = ValueHandle {
298                segment_id: vhandle.segment_id,
299                offset,
300            };
301
302            self.blob_cache.insert(self.id, &value_handle, val);
303        }
304
305        Ok(Some(val))
306    }
307
308    fn get_writer_raw(&self) -> crate::Result<SegmentWriter<C>> {
309        SegmentWriter::new(
310            self.id_generator.clone(),
311            self.config.segment_size_bytes,
312            self.path.join(SEGMENTS_FOLDER),
313        )
314        .map_err(Into::into)
315    }
316
317    /// Initializes a new segment writer.
318    ///
319    /// # Errors
320    ///
321    /// Will return `Err` if an IO error occurs.
322    pub fn get_writer(&self) -> crate::Result<SegmentWriter<C>> {
323        self.get_writer_raw()
324            .map(|x| x.use_compression(self.config.compression.clone()))
325    }
326
327    /// Drops stale segments.
328    ///
329    /// Returns the amount of disk space (compressed data) freed.
330    ///
331    /// # Errors
332    ///
333    /// Will return `Err` if an IO error occurs.
334    pub fn drop_stale_segments(&self) -> crate::Result<u64> {
335        // IMPORTANT: Only allow 1 rollover or GC at any given time
336        let _guard = self.rollover_guard.lock().expect("lock is poisoned");
337
338        let segments = self
339            .manifest
340            .segments
341            .read()
342            .expect("lock is poisoned")
343            .values()
344            .filter(|x| x.is_stale())
345            .cloned()
346            .collect::<Vec<_>>();
347
348        let bytes_freed = segments.iter().map(|x| x.meta.compressed_bytes).sum();
349
350        let ids = segments.iter().map(|x| x.id).collect::<Vec<_>>();
351
352        if ids.is_empty() {
353            log::trace!("No blob files to drop");
354        } else {
355            log::info!("Dropping stale blob files: {ids:?}");
356            self.manifest.drop_segments(&ids)?;
357
358            for segment in segments {
359                std::fs::remove_file(&segment.path)?;
360            }
361        }
362
363        Ok(bytes_freed)
364    }
365
366    /// Marks some segments as stale.
367    ///
368    /// # Errors
369    ///
370    /// Will return `Err` if an IO error occurs.
371    fn mark_as_stale(&self, ids: &[SegmentId]) {
372        // NOTE: Read-locking is fine because we are dealing with an atomic bool
373        #[allow(clippy::significant_drop_tightening)]
374        let segments = self.manifest.segments.read().expect("lock is poisoned");
375
376        for id in ids {
377            let Some(segment) = segments.get(id) else {
378                continue;
379            };
380
381            segment.mark_as_stale();
382        }
383    }
384
385    // TODO: remove?
386    /// Returns the approximate space amplification.
387    ///
388    /// Returns 0.0 if there are no items.
389    #[must_use]
390    pub fn space_amp(&self) -> f32 {
391        self.manifest.space_amp()
392    }
393
394    #[doc(hidden)]
395    #[allow(clippy::cast_precision_loss)]
396    #[must_use]
397    pub fn consume_scan_result(&self, size_map: &SizeMap) -> GcReport {
398        let mut report = GcReport {
399            path: self.path.clone(),
400            segment_count: self.segment_count(),
401            stale_segment_count: 0,
402            stale_bytes: 0,
403            total_bytes: 0,
404            stale_blobs: 0,
405            total_blobs: 0,
406        };
407
408        for (&id, counter) in size_map {
409            let segment = self.manifest.get_segment(id).expect("segment should exist");
410
411            let total_bytes = segment.meta.total_uncompressed_bytes;
412            let total_items = segment.meta.item_count;
413
414            report.total_bytes += total_bytes;
415            report.total_blobs += total_items;
416
417            if counter.item_count > 0 {
418                let used_size = counter.size;
419                let alive_item_count = counter.item_count;
420
421                let segment = self.manifest.get_segment(id).expect("segment should exist");
422
423                let stale_bytes = total_bytes - used_size;
424                let stale_items = total_items - alive_item_count;
425
426                segment.gc_stats.set_stale_bytes(stale_bytes);
427                segment.gc_stats.set_stale_items(stale_items);
428
429                report.stale_bytes += stale_bytes;
430                report.stale_blobs += stale_items;
431            } else {
432                log::debug!(
433                "Blob file #{id} has no incoming references - can be dropped, freeing {} KiB on disk (userdata={} MiB)",
434                segment.meta.compressed_bytes / 1_024,
435                total_bytes / 1_024 / 1_024,
436            );
437                self.mark_as_stale(&[id]);
438
439                report.stale_segment_count += 1;
440                report.stale_bytes += total_bytes;
441                report.stale_blobs += total_items;
442            }
443        }
444
445        report
446    }
447
448    /// Scans the given index and collects GC statistics.
449    ///
450    /// # Errors
451    ///
452    /// Will return `Err` if an IO error occurs.
453    #[allow(clippy::significant_drop_tightening)]
454    pub fn scan_for_stats(
455        &self,
456        iter: impl Iterator<Item = std::io::Result<(ValueHandle, u32)>>,
457    ) -> crate::Result<GcReport> {
458        let lock_guard = self.rollover_guard.lock().expect("lock is poisoned");
459
460        let ids = self.manifest.list_segment_ids();
461
462        let mut scanner = Scanner::new(iter, lock_guard, &ids);
463        scanner.scan()?;
464        let size_map = scanner.finish();
465        let report = self.consume_scan_result(&size_map);
466
467        Ok(report)
468    }
469
470    #[doc(hidden)]
471    pub fn get_reader(&self) -> crate::Result<MergeReader<C>> {
472        let readers = self
473            .manifest
474            .segments
475            .read()
476            .expect("lock is poisoned")
477            .values()
478            .map(|x| x.scan())
479            .collect::<crate::Result<Vec<_>>>()?;
480
481        Ok(MergeReader::new(readers))
482    }
483
484    /// Returns the amount of disk space (compressed data) freed.
485    #[doc(hidden)]
486    pub fn major_compact<R: IndexReader, W: IndexWriter>(
487        &self,
488        index_reader: &R,
489        index_writer: W,
490    ) -> crate::Result<u64> {
491        let ids = self.manifest.list_segment_ids();
492        self.rollover(&ids, index_reader, index_writer)
493    }
494
495    /// Applies a GC strategy.
496    ///
497    /// # Errors
498    ///
499    /// Will return `Err` if an IO error occurs.
500    pub fn apply_gc_strategy<R: IndexReader, W: IndexWriter>(
501        &self,
502        strategy: &impl GcStrategy<BC, C>,
503        index_reader: &R,
504        index_writer: W,
505    ) -> crate::Result<u64> {
506        let segment_ids = strategy.pick(self);
507        self.rollover(&segment_ids, index_reader, index_writer)
508    }
509
510    /// Atomically removes all data from the value log.
511    ///
512    /// If `prune_async` is set to `true`, the blob files will be removed from disk in a thread to avoid blocking.
513    pub fn clear(&self, prune_async: bool) -> crate::Result<()> {
514        let guard = self.rollover_guard.lock().expect("lock is poisoned");
515        let ids = self.manifest.list_segment_ids();
516        self.manifest.clear()?;
517        drop(guard);
518
519        if prune_async {
520            let path = self.path.clone();
521
522            std::thread::spawn(move || {
523                log::trace!("Pruning dropped blob files in thread: {ids:?}");
524                unlink_blob_files(&path, &ids);
525                log::trace!("Successfully pruned all blob files");
526            });
527        } else {
528            log::trace!("Pruning dropped blob files: {ids:?}");
529            unlink_blob_files(&self.path, &ids);
530            log::trace!("Successfully pruned all blob files");
531        }
532
533        Ok(())
534    }
535
536    /// Rewrites some segments into new segment(s), blocking the caller
537    /// until the operation is completely done.
538    ///
539    /// Returns the amount of disk space (compressed data) freed.
540    ///
541    /// # Errors
542    ///
543    /// Will return `Err` if an IO error occurs.
544    #[doc(hidden)]
545    pub fn rollover<R: IndexReader, W: IndexWriter>(
546        &self,
547        ids: &[u64],
548        index_reader: &R,
549        mut index_writer: W,
550    ) -> crate::Result<u64> {
551        if ids.is_empty() {
552            return Ok(0);
553        }
554
555        // IMPORTANT: Only allow 1 rollover or GC at any given time
556        let _guard = self.rollover_guard.lock().expect("lock is poisoned");
557
558        let size_before = self.manifest.disk_space_used();
559
560        log::info!("Rollover segments {ids:?}");
561
562        let segments = ids
563            .iter()
564            .map(|&x| self.manifest.get_segment(x))
565            .collect::<Option<Vec<_>>>();
566
567        let Some(segments) = segments else {
568            return Ok(0);
569        };
570
571        let readers = segments
572            .into_iter()
573            .map(|x| x.scan())
574            .collect::<crate::Result<Vec<_>>>()?;
575
576        // TODO: 2.0.0: Store uncompressed size per blob
577        // so we can avoid recompression costs during GC
578        // but have stats be correct
579
580        let reader = MergeReader::new(
581            readers
582                .into_iter()
583                .map(|x| x.use_compression(self.config.compression.clone()))
584                .collect(),
585        );
586
587        let mut writer = self
588            .get_writer_raw()?
589            .use_compression(self.config.compression.clone());
590
591        for item in reader {
592            let (k, v, segment_id, _) = item?;
593
594            match index_reader.get(&k)? {
595                // If this value is in an older segment, we can discard it
596                Some(vhandle) if segment_id < vhandle.segment_id => continue,
597                None => continue,
598                _ => {}
599            }
600
601            let vhandle = writer.get_next_value_handle();
602
603            // NOTE: Truncation is OK because we know values are u32 max
604            #[allow(clippy::cast_possible_truncation)]
605            index_writer.insert_indirect(&k, vhandle, v.len() as u32)?;
606
607            writer.write(&k, &v)?;
608        }
609
610        // IMPORTANT: New segments need to be persisted before adding to index
611        // to avoid dangling pointers
612        self.manifest.register(writer)?;
613
614        // NOTE: If we crash here, it's fine, the segments are registered
615        // but never referenced, so they can just be dropped after recovery
616        index_writer.finish()?;
617
618        // IMPORTANT: We only mark the segments as definitely stale
619        // The external index needs to decide when it is safe to drop
620        // the old segments, as some reads may still be performed
621        self.mark_as_stale(ids);
622
623        let size_after = self.manifest.disk_space_used();
624
625        Ok(size_before.saturating_sub(size_after))
626    }
627}