value_log/
value_log.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::{
6    blob_cache::BlobCache,
7    gc::report::GcReport,
8    id::{IdGenerator, SegmentId},
9    index::Writer as IndexWriter,
10    manifest::{SegmentManifest, SEGMENTS_FOLDER, VLOG_MARKER},
11    path::absolute_path,
12    scanner::{Scanner, SizeMap},
13    segment::merge::MergeReader,
14    value::UserValue,
15    version::Version,
16    Compressor, Config, GcStrategy, IndexReader, SegmentReader, SegmentWriter, ValueHandle,
17};
18use std::{
19    fs::File,
20    io::{BufReader, Seek},
21    path::PathBuf,
22    sync::{atomic::AtomicU64, Arc, Mutex},
23};
24
25/// Unique value log ID
26#[allow(clippy::module_name_repetitions)]
27pub type ValueLogId = u64;
28
29/// Hands out a unique (monotonically increasing) value log ID.
30pub fn get_next_vlog_id() -> ValueLogId {
31    static VLOG_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
32    VLOG_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
33}
34
35/// A disk-resident value log
36#[derive(Clone)]
37pub struct ValueLog<C: Compressor + Clone>(Arc<ValueLogInner<C>>);
38
39impl<C: Compressor + Clone> std::ops::Deref for ValueLog<C> {
40    type Target = ValueLogInner<C>;
41
42    fn deref(&self) -> &Self::Target {
43        &self.0
44    }
45}
46
47#[allow(clippy::module_name_repetitions)]
48pub struct ValueLogInner<C: Compressor + Clone> {
49    /// Unique value log ID
50    id: u64,
51
52    /// Base folder
53    pub path: PathBuf,
54
55    /// Value log configuration
56    config: Config<C>,
57
58    /// In-memory blob cache
59    blob_cache: Arc<BlobCache>,
60
61    /// Segment manifest
62    #[doc(hidden)]
63    pub manifest: SegmentManifest<C>,
64
65    /// Generator to get next segment ID
66    id_generator: IdGenerator,
67
68    /// Guards the rollover (compaction) process to only
69    /// allow one to happen at a time
70    #[doc(hidden)]
71    pub rollover_guard: Mutex<()>,
72}
73
74impl<C: Compressor + Clone> ValueLog<C> {
75    /// Creates or recovers a value log in the given directory.
76    ///
77    /// # Errors
78    ///
79    /// Will return `Err` if an IO error occurs.
80    pub fn open<P: Into<PathBuf>>(
81        path: P, // TODO: move path into config?
82        config: Config<C>,
83    ) -> crate::Result<Self> {
84        let path = path.into();
85
86        if path.join(VLOG_MARKER).try_exists()? {
87            Self::recover(path, config)
88        } else {
89            Self::create_new(path, config)
90        }
91    }
92
93    /* /// Prints fragmentation histogram.
94    pub fn print_fragmentation_histogram(&self) {
95        let lock = self.manifest.segments.read().expect("lock is poisoned");
96
97        for (id, segment) in &*lock {
98            let stale_ratio = segment.stale_ratio();
99
100            let progress = (stale_ratio * 10.0) as usize;
101            let void = 10 - progress;
102
103            let progress = "=".repeat(progress);
104            let void = " ".repeat(void);
105
106            println!(
107                "{id:0>4} [{progress}{void}] {}%",
108                (stale_ratio * 100.0) as usize
109            );
110        }
111    } */
112
113    #[doc(hidden)]
114    pub fn verify(&self) -> crate::Result<usize> {
115        let _lock = self.rollover_guard.lock().expect("lock is poisoned");
116
117        let mut sum = 0;
118
119        for item in self.get_reader()? {
120            let (k, v, _, expected_checksum) = item?;
121
122            let mut hasher = xxhash_rust::xxh3::Xxh3::new();
123            hasher.update(&k);
124            hasher.update(&v);
125
126            if hasher.digest() != expected_checksum {
127                sum += 1;
128            }
129        }
130
131        Ok(sum)
132    }
133
134    /// Creates a new empty value log in a directory.
135    pub(crate) fn create_new<P: Into<PathBuf>>(path: P, config: Config<C>) -> crate::Result<Self> {
136        let path = absolute_path(path.into());
137        log::trace!("Creating value-log at {}", path.display());
138
139        std::fs::create_dir_all(&path)?;
140
141        let marker_path = path.join(VLOG_MARKER);
142        assert!(!marker_path.try_exists()?);
143
144        std::fs::create_dir_all(path.join(SEGMENTS_FOLDER))?;
145
146        // NOTE: Lastly, fsync .vlog marker, which contains the version
147        // -> the V-log is fully initialized
148
149        let mut file = std::fs::File::create(marker_path)?;
150        Version::V1.write_file_header(&mut file)?;
151        file.sync_all()?;
152
153        #[cfg(not(target_os = "windows"))]
154        {
155            // fsync folders on Unix
156
157            let folder = std::fs::File::open(path.join(SEGMENTS_FOLDER))?;
158            folder.sync_all()?;
159
160            let folder = std::fs::File::open(&path)?;
161            folder.sync_all()?;
162        }
163
164        let blob_cache = config.blob_cache.clone();
165        let manifest = SegmentManifest::create_new(&path)?;
166
167        Ok(Self(Arc::new(ValueLogInner {
168            id: get_next_vlog_id(),
169            config,
170            path,
171            blob_cache,
172            manifest,
173            id_generator: IdGenerator::default(),
174            rollover_guard: Mutex::new(()),
175        })))
176    }
177
178    pub(crate) fn recover<P: Into<PathBuf>>(path: P, config: Config<C>) -> crate::Result<Self> {
179        let path = path.into();
180        log::info!("Recovering vLog at {}", path.display());
181
182        {
183            let bytes = std::fs::read(path.join(VLOG_MARKER))?;
184
185            if let Some(version) = Version::parse_file_header(&bytes) {
186                if version != Version::V1 {
187                    return Err(crate::Error::InvalidVersion(Some(version)));
188                }
189            } else {
190                return Err(crate::Error::InvalidVersion(None));
191            }
192        }
193
194        let blob_cache = config.blob_cache.clone();
195        let manifest = SegmentManifest::recover(&path)?;
196
197        let highest_id = manifest
198            .segments
199            .read()
200            .expect("lock is poisoned")
201            .values()
202            .map(|x| x.id)
203            .max()
204            .unwrap_or_default();
205
206        Ok(Self(Arc::new(ValueLogInner {
207            id: get_next_vlog_id(),
208            config,
209            path,
210            blob_cache,
211            manifest,
212            id_generator: IdGenerator::new(highest_id + 1),
213            rollover_guard: Mutex::new(()),
214        })))
215    }
216
217    /// Registers a [`SegmentWriter`].
218    ///
219    /// # Errors
220    ///
221    /// Will return `Err` if an IO error occurs.
222    pub fn register_writer(&self, writer: SegmentWriter<C>) -> crate::Result<()> {
223        let _lock = self.rollover_guard.lock().expect("lock is poisoned");
224        self.manifest.register(writer)?;
225        Ok(())
226    }
227
228    /// Returns the amount of segments in the value log.
229    #[must_use]
230    pub fn segment_count(&self) -> usize {
231        self.manifest.len()
232    }
233
234    /// Resolves a value handle.
235    ///
236    /// # Errors
237    ///
238    /// Will return `Err` if an IO error occurs.
239    pub fn get(&self, vhandle: &ValueHandle) -> crate::Result<Option<UserValue>> {
240        self.get_with_prefetch(vhandle, 0)
241    }
242
243    /// Resolves a value handle, and prefetches some values after it.
244    ///
245    /// # Errors
246    ///
247    /// Will return `Err` if an IO error occurs.
248    pub fn get_with_prefetch(
249        &self,
250        vhandle: &ValueHandle,
251        prefetch_size: usize,
252    ) -> crate::Result<Option<UserValue>> {
253        if let Some(value) = self.blob_cache.get(self.id, vhandle) {
254            return Ok(Some(value));
255        }
256
257        let Some(segment) = self.manifest.get_segment(vhandle.segment_id) else {
258            return Ok(None);
259        };
260
261        let mut reader = BufReader::new(File::open(&segment.path)?);
262        reader.seek(std::io::SeekFrom::Start(vhandle.offset))?;
263        let mut reader = SegmentReader::with_reader(vhandle.segment_id, reader)
264            .use_compression(self.config.compression.clone());
265
266        let Some(item) = reader.next() else {
267            return Ok(None);
268        };
269        let (_key, val, _checksum) = item?;
270
271        self.blob_cache
272            .insert((self.id, vhandle.clone()).into(), val.clone());
273
274        // TODO: maybe we can look at the value size and prefetch some more values
275        // without causing another I/O...
276        // TODO: benchmark range reads for rather small non-inlined blobs (maybe ~512-1000B)
277        // and see how different BufReader capacities and prefetch changes range read performance
278        for _ in 0..prefetch_size {
279            let offset = reader.get_offset()?;
280
281            let Some(item) = reader.next() else {
282                break;
283            };
284            let (_key, val, _checksum) = item?;
285
286            let value_handle = ValueHandle {
287                segment_id: vhandle.segment_id,
288                offset,
289            };
290
291            self.blob_cache.insert((self.id, value_handle).into(), val);
292        }
293
294        Ok(Some(val))
295    }
296
297    fn get_writer_raw(&self) -> crate::Result<SegmentWriter<C>> {
298        SegmentWriter::new(
299            self.id_generator.clone(),
300            self.config.segment_size_bytes,
301            self.path.join(SEGMENTS_FOLDER),
302        )
303        .map_err(Into::into)
304    }
305
306    /// Initializes a new segment writer.
307    ///
308    /// # Errors
309    ///
310    /// Will return `Err` if an IO error occurs.
311    pub fn get_writer(&self) -> crate::Result<SegmentWriter<C>> {
312        self.get_writer_raw()
313            .map(|x| x.use_compression(self.config.compression.clone()))
314    }
315
316    /// Drops stale segments.
317    ///
318    /// Returns the amount of disk space (compressed data) freed.
319    ///
320    /// # Errors
321    ///
322    /// Will return `Err` if an IO error occurs.
323    pub fn drop_stale_segments(&self) -> crate::Result<u64> {
324        // IMPORTANT: Only allow 1 rollover or GC at any given time
325        let _guard = self.rollover_guard.lock().expect("lock is poisoned");
326
327        let segments = self
328            .manifest
329            .segments
330            .read()
331            .expect("lock is poisoned")
332            .values()
333            .filter(|x| x.is_stale())
334            .cloned()
335            .collect::<Vec<_>>();
336
337        let bytes_freed = segments.iter().map(|x| x.meta.compressed_bytes).sum();
338
339        let ids = segments.iter().map(|x| x.id).collect::<Vec<_>>();
340
341        if ids.is_empty() {
342            log::trace!("No blob files to drop");
343        } else {
344            log::info!("Dropping stale blob files: {ids:?}");
345            self.manifest.drop_segments(&ids)?;
346
347            for segment in segments {
348                std::fs::remove_file(&segment.path)?;
349            }
350        }
351
352        Ok(bytes_freed)
353    }
354
355    /// Marks some segments as stale.
356    ///
357    /// # Errors
358    ///
359    /// Will return `Err` if an IO error occurs.
360    fn mark_as_stale(&self, ids: &[SegmentId]) {
361        // NOTE: Read-locking is fine because we are dealing with an atomic bool
362        #[allow(clippy::significant_drop_tightening)]
363        let segments = self.manifest.segments.read().expect("lock is poisoned");
364
365        for id in ids {
366            let Some(segment) = segments.get(id) else {
367                continue;
368            };
369
370            segment.mark_as_stale();
371        }
372    }
373
374    // TODO: remove?
375    /// Returns the approximate space amplification.
376    ///
377    /// Returns 0.0 if there are no items.
378    #[must_use]
379    pub fn space_amp(&self) -> f32 {
380        self.manifest.space_amp()
381    }
382
383    #[doc(hidden)]
384    #[allow(clippy::cast_precision_loss)]
385    #[must_use]
386    pub fn consume_scan_result(&self, size_map: &SizeMap) -> GcReport {
387        let mut report = GcReport {
388            path: self.path.clone(),
389            segment_count: self.segment_count(),
390            stale_segment_count: 0,
391            stale_bytes: 0,
392            total_bytes: 0,
393            stale_blobs: 0,
394            total_blobs: 0,
395        };
396
397        for (&id, counter) in size_map {
398            let segment = self.manifest.get_segment(id).expect("segment should exist");
399
400            let total_bytes = segment.meta.total_uncompressed_bytes;
401            let total_items = segment.meta.item_count;
402
403            report.total_bytes += total_bytes;
404            report.total_blobs += total_items;
405
406            if counter.item_count > 0 {
407                let used_size = counter.size;
408                let alive_item_count = counter.item_count;
409
410                let segment = self.manifest.get_segment(id).expect("segment should exist");
411
412                let stale_bytes = total_bytes - used_size;
413                let stale_items = total_items - alive_item_count;
414
415                segment.gc_stats.set_stale_bytes(stale_bytes);
416                segment.gc_stats.set_stale_items(stale_items);
417
418                report.stale_bytes += stale_bytes;
419                report.stale_blobs += stale_items;
420            } else {
421                log::debug!(
422                "Blob file #{id} has no incoming references - can be dropped, freeing {} KiB on disk (userdata={} MiB)",
423                segment.meta.compressed_bytes / 1_024,
424                total_bytes / 1_024 / 1_024,
425            );
426                self.mark_as_stale(&[id]);
427
428                report.stale_segment_count += 1;
429                report.stale_bytes += total_bytes;
430                report.stale_blobs += total_items;
431            }
432        }
433
434        report
435    }
436
437    /// Scans the given index and collects GC statistics.
438    ///
439    /// # Errors
440    ///
441    /// Will return `Err` if an IO error occurs.
442    #[allow(clippy::significant_drop_tightening)]
443    pub fn scan_for_stats(
444        &self,
445        iter: impl Iterator<Item = std::io::Result<(ValueHandle, u32)>>,
446    ) -> crate::Result<GcReport> {
447        let lock_guard = self.rollover_guard.lock().expect("lock is poisoned");
448
449        let ids = self.manifest.list_segment_ids();
450
451        let mut scanner = Scanner::new(iter, lock_guard, &ids);
452        scanner.scan()?;
453        let size_map = scanner.finish();
454        let report = self.consume_scan_result(&size_map);
455
456        Ok(report)
457    }
458
459    #[doc(hidden)]
460    pub fn get_reader(&self) -> crate::Result<MergeReader<C>> {
461        let readers = self
462            .manifest
463            .segments
464            .read()
465            .expect("lock is poisoned")
466            .values()
467            .map(|x| x.scan())
468            .collect::<crate::Result<Vec<_>>>()?;
469
470        Ok(MergeReader::new(readers))
471    }
472
473    /// Returns the amount of disk space (compressed data) freed.
474    #[doc(hidden)]
475    pub fn major_compact<R: IndexReader, W: IndexWriter>(
476        &self,
477        index_reader: &R,
478        index_writer: W,
479    ) -> crate::Result<u64> {
480        let ids = self.manifest.list_segment_ids();
481        self.rollover(&ids, index_reader, index_writer)
482    }
483
484    /// Applies a GC strategy.
485    ///
486    /// # Errors
487    ///
488    /// Will return `Err` if an IO error occurs.
489    pub fn apply_gc_strategy<R: IndexReader, W: IndexWriter>(
490        &self,
491        strategy: &impl GcStrategy<C>,
492        index_reader: &R,
493        index_writer: W,
494    ) -> crate::Result<u64> {
495        let segment_ids = strategy.pick(self);
496        self.rollover(&segment_ids, index_reader, index_writer)
497    }
498
499    /// Rewrites some segments into new segment(s), blocking the caller
500    /// until the operation is completely done.
501    ///
502    /// Returns the amount of disk space (compressed data) freed.
503    ///
504    /// # Errors
505    ///
506    /// Will return `Err` if an IO error occurs.
507    #[doc(hidden)]
508    pub fn rollover<R: IndexReader, W: IndexWriter>(
509        &self,
510        ids: &[u64],
511        index_reader: &R,
512        mut index_writer: W,
513    ) -> crate::Result<u64> {
514        if ids.is_empty() {
515            return Ok(0);
516        }
517
518        // IMPORTANT: Only allow 1 rollover or GC at any given time
519        let _guard = self.rollover_guard.lock().expect("lock is poisoned");
520
521        let size_before = self.manifest.disk_space_used();
522
523        log::info!("Rollover segments {ids:?}");
524
525        let segments = ids
526            .iter()
527            .map(|&x| self.manifest.get_segment(x))
528            .collect::<Option<Vec<_>>>();
529
530        let Some(segments) = segments else {
531            return Ok(0);
532        };
533
534        let readers = segments
535            .into_iter()
536            .map(|x| x.scan())
537            .collect::<crate::Result<Vec<_>>>()?;
538
539        // TODO: 2.0.0: Store uncompressed size per blob
540        // so we can avoid recompression costs during GC
541        // but have stats be correct
542
543        let reader = MergeReader::new(
544            readers
545                .into_iter()
546                .map(|x| x.use_compression(self.config.compression.clone()))
547                .collect(),
548        );
549
550        let mut writer = self
551            .get_writer_raw()?
552            .use_compression(self.config.compression.clone());
553
554        for item in reader {
555            let (k, v, segment_id, _) = item?;
556
557            match index_reader.get(&k)? {
558                // If this value is in an older segment, we can discard it
559                Some(vhandle) if segment_id < vhandle.segment_id => continue,
560                None => continue,
561                _ => {}
562            }
563
564            let vhandle = writer.get_next_value_handle();
565
566            // NOTE: Truncation is OK because we know values are u32 max
567            #[allow(clippy::cast_possible_truncation)]
568            index_writer.insert_indirect(&k, vhandle, v.len() as u32)?;
569
570            writer.write(&k, &v)?;
571        }
572
573        // IMPORTANT: New segments need to be persisted before adding to index
574        // to avoid dangling pointers
575        self.manifest.register(writer)?;
576
577        // NOTE: If we crash here, it's fine, the segments are registered
578        // but never referenced, so they can just be dropped after recovery
579        index_writer.finish()?;
580
581        // IMPORTANT: We only mark the segments as definitely stale
582        // The external index needs to decide when it is safe to drop
583        // the old segments, as some reads may still be performed
584        self.mark_as_stale(ids);
585
586        let size_after = self.manifest.disk_space_used();
587
588        Ok(size_before.saturating_sub(size_after))
589    }
590}