value_log/
value_log.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::{
6    blob_cache::BlobCache,
7    gc::report::GcReport,
8    id::{IdGenerator, SegmentId},
9    index::Writer as IndexWriter,
10    manifest::{SegmentManifest, SEGMENTS_FOLDER, VLOG_MARKER},
11    path::absolute_path,
12    scanner::{Scanner, SizeMap},
13    segment::merge::MergeReader,
14    value::UserValue,
15    version::Version,
16    Compressor, Config, GcStrategy, IndexReader, SegmentReader, SegmentWriter, ValueHandle,
17};
18use std::{
19    fs::File,
20    io::{BufReader, Seek},
21    path::{Path, PathBuf},
22    sync::{atomic::AtomicU64, Arc, Mutex},
23};
24
25/// Unique value log ID
26#[allow(clippy::module_name_repetitions)]
27pub type ValueLogId = u64;
28
29/// Hands out a unique (monotonically increasing) value log ID.
30pub fn get_next_vlog_id() -> ValueLogId {
31    static VLOG_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
32    VLOG_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
33}
34
35fn unlink_blob_files(base_path: &Path, ids: &[SegmentId]) {
36    for id in ids {
37        let path = base_path.join(SEGMENTS_FOLDER).join(id.to_string());
38
39        if let Err(e) = std::fs::remove_file(&path) {
40            log::error!("Could not free blob file at {path:?}: {e:?}");
41        }
42    }
43}
44
45/// A disk-resident value log
46#[derive(Clone)]
47pub struct ValueLog<C: Compressor + Clone>(Arc<ValueLogInner<C>>);
48
49impl<C: Compressor + Clone> std::ops::Deref for ValueLog<C> {
50    type Target = ValueLogInner<C>;
51
52    fn deref(&self) -> &Self::Target {
53        &self.0
54    }
55}
56
57#[allow(clippy::module_name_repetitions)]
58pub struct ValueLogInner<C: Compressor + Clone> {
59    /// Unique value log ID
60    id: u64,
61
62    /// Base folder
63    pub path: PathBuf,
64
65    /// Value log configuration
66    config: Config<C>,
67
68    /// In-memory blob cache
69    blob_cache: Arc<BlobCache>,
70
71    /// Segment manifest
72    #[doc(hidden)]
73    pub manifest: SegmentManifest<C>,
74
75    /// Generator to get next segment ID
76    id_generator: IdGenerator,
77
78    /// Guards the rollover (compaction) process to only
79    /// allow one to happen at a time
80    #[doc(hidden)]
81    pub rollover_guard: Mutex<()>,
82}
83
84impl<C: Compressor + Clone> ValueLog<C> {
85    /// Creates or recovers a value log in the given directory.
86    ///
87    /// # Errors
88    ///
89    /// Will return `Err` if an IO error occurs.
90    pub fn open<P: Into<PathBuf>>(
91        path: P, // TODO: move path into config?
92        config: Config<C>,
93    ) -> crate::Result<Self> {
94        let path = path.into();
95
96        if path.join(VLOG_MARKER).try_exists()? {
97            Self::recover(path, config)
98        } else {
99            Self::create_new(path, config)
100        }
101    }
102
103    /* /// Prints fragmentation histogram.
104    pub fn print_fragmentation_histogram(&self) {
105        let lock = self.manifest.segments.read().expect("lock is poisoned");
106
107        for (id, segment) in &*lock {
108            let stale_ratio = segment.stale_ratio();
109
110            let progress = (stale_ratio * 10.0) as usize;
111            let void = 10 - progress;
112
113            let progress = "=".repeat(progress);
114            let void = " ".repeat(void);
115
116            println!(
117                "{id:0>4} [{progress}{void}] {}%",
118                (stale_ratio * 100.0) as usize
119            );
120        }
121    } */
122
123    #[doc(hidden)]
124    pub fn verify(&self) -> crate::Result<usize> {
125        let _lock = self.rollover_guard.lock().expect("lock is poisoned");
126
127        let mut sum = 0;
128
129        for item in self.get_reader()? {
130            let (k, v, _, expected_checksum) = item?;
131
132            let mut hasher = xxhash_rust::xxh3::Xxh3::new();
133            hasher.update(&k);
134            hasher.update(&v);
135
136            if hasher.digest() != expected_checksum {
137                sum += 1;
138            }
139        }
140
141        Ok(sum)
142    }
143
144    /// Creates a new empty value log in a directory.
145    pub(crate) fn create_new<P: Into<PathBuf>>(path: P, config: Config<C>) -> crate::Result<Self> {
146        let path = absolute_path(path.into());
147        log::trace!("Creating value-log at {}", path.display());
148
149        std::fs::create_dir_all(&path)?;
150
151        let marker_path = path.join(VLOG_MARKER);
152        assert!(!marker_path.try_exists()?);
153
154        std::fs::create_dir_all(path.join(SEGMENTS_FOLDER))?;
155
156        // NOTE: Lastly, fsync .vlog marker, which contains the version
157        // -> the V-log is fully initialized
158
159        let mut file = std::fs::File::create(marker_path)?;
160        Version::V1.write_file_header(&mut file)?;
161        file.sync_all()?;
162
163        #[cfg(not(target_os = "windows"))]
164        {
165            // fsync folders on Unix
166
167            let folder = std::fs::File::open(path.join(SEGMENTS_FOLDER))?;
168            folder.sync_all()?;
169
170            let folder = std::fs::File::open(&path)?;
171            folder.sync_all()?;
172        }
173
174        let blob_cache = config.blob_cache.clone();
175        let manifest = SegmentManifest::create_new(&path)?;
176
177        Ok(Self(Arc::new(ValueLogInner {
178            id: get_next_vlog_id(),
179            config,
180            path,
181            blob_cache,
182            manifest,
183            id_generator: IdGenerator::default(),
184            rollover_guard: Mutex::new(()),
185        })))
186    }
187
188    pub(crate) fn recover<P: Into<PathBuf>>(path: P, config: Config<C>) -> crate::Result<Self> {
189        let path = path.into();
190        log::info!("Recovering vLog at {}", path.display());
191
192        {
193            let bytes = std::fs::read(path.join(VLOG_MARKER))?;
194
195            if let Some(version) = Version::parse_file_header(&bytes) {
196                if version != Version::V1 {
197                    return Err(crate::Error::InvalidVersion(Some(version)));
198                }
199            } else {
200                return Err(crate::Error::InvalidVersion(None));
201            }
202        }
203
204        let blob_cache = config.blob_cache.clone();
205        let manifest = SegmentManifest::recover(&path)?;
206
207        let highest_id = manifest
208            .segments
209            .read()
210            .expect("lock is poisoned")
211            .values()
212            .map(|x| x.id)
213            .max()
214            .unwrap_or_default();
215
216        Ok(Self(Arc::new(ValueLogInner {
217            id: get_next_vlog_id(),
218            config,
219            path,
220            blob_cache,
221            manifest,
222            id_generator: IdGenerator::new(highest_id + 1),
223            rollover_guard: Mutex::new(()),
224        })))
225    }
226
227    /// Registers a [`SegmentWriter`].
228    ///
229    /// # Errors
230    ///
231    /// Will return `Err` if an IO error occurs.
232    pub fn register_writer(&self, writer: SegmentWriter<C>) -> crate::Result<()> {
233        let _lock = self.rollover_guard.lock().expect("lock is poisoned");
234        self.manifest.register(writer)?;
235        Ok(())
236    }
237
238    /// Returns the amount of segments in the value log.
239    #[must_use]
240    pub fn segment_count(&self) -> usize {
241        self.manifest.len()
242    }
243
244    /// Resolves a value handle.
245    ///
246    /// # Errors
247    ///
248    /// Will return `Err` if an IO error occurs.
249    pub fn get(&self, vhandle: &ValueHandle) -> crate::Result<Option<UserValue>> {
250        self.get_with_prefetch(vhandle, 0)
251    }
252
253    /// Resolves a value handle, and prefetches some values after it.
254    ///
255    /// # Errors
256    ///
257    /// Will return `Err` if an IO error occurs.
258    pub fn get_with_prefetch(
259        &self,
260        vhandle: &ValueHandle,
261        prefetch_size: usize,
262    ) -> crate::Result<Option<UserValue>> {
263        if let Some(value) = self.blob_cache.get(self.id, vhandle) {
264            return Ok(Some(value));
265        }
266
267        let Some(segment) = self.manifest.get_segment(vhandle.segment_id) else {
268            return Ok(None);
269        };
270
271        let mut reader = BufReader::new(File::open(&segment.path)?);
272        reader.seek(std::io::SeekFrom::Start(vhandle.offset))?;
273        let mut reader = SegmentReader::with_reader(vhandle.segment_id, reader)
274            .use_compression(self.config.compression.clone());
275
276        let Some(item) = reader.next() else {
277            return Ok(None);
278        };
279        let (_key, val, _checksum) = item?;
280
281        self.blob_cache
282            .insert((self.id, vhandle.clone()).into(), val.clone());
283
284        // TODO: maybe we can look at the value size and prefetch some more values
285        // without causing another I/O...
286        // TODO: benchmark range reads for rather small non-inlined blobs (maybe ~512-1000B)
287        // and see how different BufReader capacities and prefetch changes range read performance
288        for _ in 0..prefetch_size {
289            let offset = reader.get_offset()?;
290
291            let Some(item) = reader.next() else {
292                break;
293            };
294            let (_key, val, _checksum) = item?;
295
296            let value_handle = ValueHandle {
297                segment_id: vhandle.segment_id,
298                offset,
299            };
300
301            self.blob_cache.insert((self.id, value_handle).into(), val);
302        }
303
304        Ok(Some(val))
305    }
306
307    fn get_writer_raw(&self) -> crate::Result<SegmentWriter<C>> {
308        SegmentWriter::new(
309            self.id_generator.clone(),
310            self.config.segment_size_bytes,
311            self.path.join(SEGMENTS_FOLDER),
312        )
313        .map_err(Into::into)
314    }
315
316    /// Initializes a new segment writer.
317    ///
318    /// # Errors
319    ///
320    /// Will return `Err` if an IO error occurs.
321    pub fn get_writer(&self) -> crate::Result<SegmentWriter<C>> {
322        self.get_writer_raw()
323            .map(|x| x.use_compression(self.config.compression.clone()))
324    }
325
326    /// Drops stale segments.
327    ///
328    /// Returns the amount of disk space (compressed data) freed.
329    ///
330    /// # Errors
331    ///
332    /// Will return `Err` if an IO error occurs.
333    pub fn drop_stale_segments(&self) -> crate::Result<u64> {
334        // IMPORTANT: Only allow 1 rollover or GC at any given time
335        let _guard = self.rollover_guard.lock().expect("lock is poisoned");
336
337        let segments = self
338            .manifest
339            .segments
340            .read()
341            .expect("lock is poisoned")
342            .values()
343            .filter(|x| x.is_stale())
344            .cloned()
345            .collect::<Vec<_>>();
346
347        let bytes_freed = segments.iter().map(|x| x.meta.compressed_bytes).sum();
348
349        let ids = segments.iter().map(|x| x.id).collect::<Vec<_>>();
350
351        if ids.is_empty() {
352            log::trace!("No blob files to drop");
353        } else {
354            log::info!("Dropping stale blob files: {ids:?}");
355            self.manifest.drop_segments(&ids)?;
356
357            for segment in segments {
358                std::fs::remove_file(&segment.path)?;
359            }
360        }
361
362        Ok(bytes_freed)
363    }
364
365    /// Marks some segments as stale.
366    ///
367    /// # Errors
368    ///
369    /// Will return `Err` if an IO error occurs.
370    fn mark_as_stale(&self, ids: &[SegmentId]) {
371        // NOTE: Read-locking is fine because we are dealing with an atomic bool
372        #[allow(clippy::significant_drop_tightening)]
373        let segments = self.manifest.segments.read().expect("lock is poisoned");
374
375        for id in ids {
376            let Some(segment) = segments.get(id) else {
377                continue;
378            };
379
380            segment.mark_as_stale();
381        }
382    }
383
384    // TODO: remove?
385    /// Returns the approximate space amplification.
386    ///
387    /// Returns 0.0 if there are no items.
388    #[must_use]
389    pub fn space_amp(&self) -> f32 {
390        self.manifest.space_amp()
391    }
392
393    #[doc(hidden)]
394    #[allow(clippy::cast_precision_loss)]
395    #[must_use]
396    pub fn consume_scan_result(&self, size_map: &SizeMap) -> GcReport {
397        let mut report = GcReport {
398            path: self.path.clone(),
399            segment_count: self.segment_count(),
400            stale_segment_count: 0,
401            stale_bytes: 0,
402            total_bytes: 0,
403            stale_blobs: 0,
404            total_blobs: 0,
405        };
406
407        for (&id, counter) in size_map {
408            let segment = self.manifest.get_segment(id).expect("segment should exist");
409
410            let total_bytes = segment.meta.total_uncompressed_bytes;
411            let total_items = segment.meta.item_count;
412
413            report.total_bytes += total_bytes;
414            report.total_blobs += total_items;
415
416            if counter.item_count > 0 {
417                let used_size = counter.size;
418                let alive_item_count = counter.item_count;
419
420                let segment = self.manifest.get_segment(id).expect("segment should exist");
421
422                let stale_bytes = total_bytes - used_size;
423                let stale_items = total_items - alive_item_count;
424
425                segment.gc_stats.set_stale_bytes(stale_bytes);
426                segment.gc_stats.set_stale_items(stale_items);
427
428                report.stale_bytes += stale_bytes;
429                report.stale_blobs += stale_items;
430            } else {
431                log::debug!(
432                "Blob file #{id} has no incoming references - can be dropped, freeing {} KiB on disk (userdata={} MiB)",
433                segment.meta.compressed_bytes / 1_024,
434                total_bytes / 1_024 / 1_024,
435            );
436                self.mark_as_stale(&[id]);
437
438                report.stale_segment_count += 1;
439                report.stale_bytes += total_bytes;
440                report.stale_blobs += total_items;
441            }
442        }
443
444        report
445    }
446
447    /// Scans the given index and collects GC statistics.
448    ///
449    /// # Errors
450    ///
451    /// Will return `Err` if an IO error occurs.
452    #[allow(clippy::significant_drop_tightening)]
453    pub fn scan_for_stats(
454        &self,
455        iter: impl Iterator<Item = std::io::Result<(ValueHandle, u32)>>,
456    ) -> crate::Result<GcReport> {
457        let lock_guard = self.rollover_guard.lock().expect("lock is poisoned");
458
459        let ids = self.manifest.list_segment_ids();
460
461        let mut scanner = Scanner::new(iter, lock_guard, &ids);
462        scanner.scan()?;
463        let size_map = scanner.finish();
464        let report = self.consume_scan_result(&size_map);
465
466        Ok(report)
467    }
468
469    #[doc(hidden)]
470    pub fn get_reader(&self) -> crate::Result<MergeReader<C>> {
471        let readers = self
472            .manifest
473            .segments
474            .read()
475            .expect("lock is poisoned")
476            .values()
477            .map(|x| x.scan())
478            .collect::<crate::Result<Vec<_>>>()?;
479
480        Ok(MergeReader::new(readers))
481    }
482
483    /// Returns the amount of disk space (compressed data) freed.
484    #[doc(hidden)]
485    pub fn major_compact<R: IndexReader, W: IndexWriter>(
486        &self,
487        index_reader: &R,
488        index_writer: W,
489    ) -> crate::Result<u64> {
490        let ids = self.manifest.list_segment_ids();
491        self.rollover(&ids, index_reader, index_writer)
492    }
493
494    /// Applies a GC strategy.
495    ///
496    /// # Errors
497    ///
498    /// Will return `Err` if an IO error occurs.
499    pub fn apply_gc_strategy<R: IndexReader, W: IndexWriter>(
500        &self,
501        strategy: &impl GcStrategy<C>,
502        index_reader: &R,
503        index_writer: W,
504    ) -> crate::Result<u64> {
505        let segment_ids = strategy.pick(self);
506        self.rollover(&segment_ids, index_reader, index_writer)
507    }
508
509    /// Atomically removes all data from the value log.
510    ///
511    /// If `prune_async` is set to `true`, the blob files will be removed from disk in a thread to avoid blocking.
512    pub fn clear(&self, prune_async: bool) -> crate::Result<()> {
513        let guard = self.rollover_guard.lock().expect("lock is poisoned");
514        let ids = self.manifest.list_segment_ids();
515        self.manifest.clear()?;
516        drop(guard);
517
518        if prune_async {
519            let path = self.path.clone();
520
521            std::thread::spawn(move || {
522                log::trace!("Pruning dropped blob files in thread: {ids:?}");
523                unlink_blob_files(&path, &ids);
524                log::trace!("Successfully pruned all blob files");
525            });
526        } else {
527            log::trace!("Pruning dropped blob files: {ids:?}");
528            unlink_blob_files(&self.path, &ids);
529            log::trace!("Successfully pruned all blob files");
530        }
531
532        Ok(())
533    }
534
535    /// Rewrites some segments into new segment(s), blocking the caller
536    /// until the operation is completely done.
537    ///
538    /// Returns the amount of disk space (compressed data) freed.
539    ///
540    /// # Errors
541    ///
542    /// Will return `Err` if an IO error occurs.
543    #[doc(hidden)]
544    pub fn rollover<R: IndexReader, W: IndexWriter>(
545        &self,
546        ids: &[u64],
547        index_reader: &R,
548        mut index_writer: W,
549    ) -> crate::Result<u64> {
550        if ids.is_empty() {
551            return Ok(0);
552        }
553
554        // IMPORTANT: Only allow 1 rollover or GC at any given time
555        let _guard = self.rollover_guard.lock().expect("lock is poisoned");
556
557        let size_before = self.manifest.disk_space_used();
558
559        log::info!("Rollover segments {ids:?}");
560
561        let segments = ids
562            .iter()
563            .map(|&x| self.manifest.get_segment(x))
564            .collect::<Option<Vec<_>>>();
565
566        let Some(segments) = segments else {
567            return Ok(0);
568        };
569
570        let readers = segments
571            .into_iter()
572            .map(|x| x.scan())
573            .collect::<crate::Result<Vec<_>>>()?;
574
575        // TODO: 2.0.0: Store uncompressed size per blob
576        // so we can avoid recompression costs during GC
577        // but have stats be correct
578
579        let reader = MergeReader::new(
580            readers
581                .into_iter()
582                .map(|x| x.use_compression(self.config.compression.clone()))
583                .collect(),
584        );
585
586        let mut writer = self
587            .get_writer_raw()?
588            .use_compression(self.config.compression.clone());
589
590        for item in reader {
591            let (k, v, segment_id, _) = item?;
592
593            match index_reader.get(&k)? {
594                // If this value is in an older segment, we can discard it
595                Some(vhandle) if segment_id < vhandle.segment_id => continue,
596                None => continue,
597                _ => {}
598            }
599
600            let vhandle = writer.get_next_value_handle();
601
602            // NOTE: Truncation is OK because we know values are u32 max
603            #[allow(clippy::cast_possible_truncation)]
604            index_writer.insert_indirect(&k, vhandle, v.len() as u32)?;
605
606            writer.write(&k, &v)?;
607        }
608
609        // IMPORTANT: New segments need to be persisted before adding to index
610        // to avoid dangling pointers
611        self.manifest.register(writer)?;
612
613        // NOTE: If we crash here, it's fine, the segments are registered
614        // but never referenced, so they can just be dropped after recovery
615        index_writer.finish()?;
616
617        // IMPORTANT: We only mark the segments as definitely stale
618        // The external index needs to decide when it is safe to drop
619        // the old segments, as some reads may still be performed
620        self.mark_as_stale(ids);
621
622        let size_after = self.manifest.disk_space_used();
623
624        Ok(size_before.saturating_sub(size_after))
625    }
626}