value_log/
manifest.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::{
6    id::SegmentId,
7    key_range::KeyRange,
8    segment::{gc_stats::GcStats, meta::Metadata, trailer::SegmentFileTrailer},
9    Compressor, HashMap, Segment, SegmentWriter as MultiWriter,
10};
11use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
12use std::{
13    io::{Cursor, Write},
14    marker::PhantomData,
15    path::{Path, PathBuf},
16    sync::{Arc, RwLock},
17};
18
19pub const VLOG_MARKER: &str = ".vlog";
20pub const SEGMENTS_FOLDER: &str = "segments";
21const MANIFEST_FILE: &str = "vlog_manifest";
22
23/// Atomically rewrites a file
24fn rewrite_atomic<P: AsRef<Path>>(path: P, content: &[u8]) -> std::io::Result<()> {
25    let path = path.as_ref();
26    let folder = path.parent().expect("should have a parent");
27
28    let mut temp_file = tempfile::NamedTempFile::new_in(folder)?;
29    temp_file.write_all(content)?;
30    temp_file.persist(path)?;
31
32    #[cfg(not(target_os = "windows"))]
33    {
34        // TODO: Not sure if the fsync is really required, but just for the sake of it...
35        // TODO: also not sure why it fails on Windows...
36        let file = std::fs::File::open(path)?;
37        file.sync_all()?;
38    }
39
40    Ok(())
41}
42
43#[allow(clippy::module_name_repetitions)]
44pub struct SegmentManifestInner<C: Compressor + Clone> {
45    path: PathBuf,
46    pub segments: RwLock<HashMap<SegmentId, Arc<Segment<C>>>>,
47}
48
49#[allow(clippy::module_name_repetitions)]
50#[derive(Clone)]
51pub struct SegmentManifest<C: Compressor + Clone>(Arc<SegmentManifestInner<C>>);
52
53impl<C: Compressor + Clone> std::ops::Deref for SegmentManifest<C> {
54    type Target = SegmentManifestInner<C>;
55
56    fn deref(&self) -> &Self::Target {
57        &self.0
58    }
59}
60
61impl<C: Compressor + Clone> SegmentManifest<C> {
62    fn remove_unfinished_segments<P: AsRef<Path>>(
63        folder: P,
64        registered_ids: &[u64],
65    ) -> crate::Result<()> {
66        for dirent in std::fs::read_dir(folder)? {
67            let dirent = dirent?;
68
69            // IMPORTANT: Skip .DS_Store files when using MacOS
70            if dirent.file_name() == ".DS_Store" {
71                continue;
72            }
73
74            if dirent.file_type()?.is_file() {
75                let segment_id = dirent
76                    .file_name()
77                    .to_str()
78                    .expect("should be valid utf-8")
79                    .parse::<u64>()
80                    .expect("should be valid segment ID");
81
82                if !registered_ids.contains(&segment_id) {
83                    log::trace!("Deleting unfinished vLog segment {segment_id}");
84                    std::fs::remove_file(dirent.path())?;
85                }
86            }
87        }
88
89        Ok(())
90    }
91
92    /// Parses segment IDs from manifest file
93    fn load_ids_from_disk<P: AsRef<Path>>(path: P) -> crate::Result<Vec<SegmentId>> {
94        let path = path.as_ref();
95        log::debug!("Loading manifest from {}", path.display());
96
97        let bytes = std::fs::read(path)?;
98
99        let mut ids = vec![];
100
101        let mut cursor = Cursor::new(bytes);
102
103        let cnt = cursor.read_u64::<BigEndian>()?;
104
105        for _ in 0..cnt {
106            ids.push(cursor.read_u64::<BigEndian>()?);
107        }
108
109        Ok(ids)
110    }
111
112    /// Recovers a value log from disk
113    pub(crate) fn recover<P: AsRef<Path>>(folder: P) -> crate::Result<Self> {
114        let folder = folder.as_ref();
115        let manifest_path = folder.join(MANIFEST_FILE);
116
117        log::info!("Recovering vLog at {folder:?}");
118
119        let ids = Self::load_ids_from_disk(&manifest_path)?;
120        let cnt = ids.len();
121
122        let progress_mod = match cnt {
123            _ if cnt <= 20 => 1,
124            _ if cnt <= 100 => 10,
125            _ => 100,
126        };
127
128        log::debug!("Recovering {cnt} vLog segments from {folder:?}");
129
130        let segments_folder = folder.join(SEGMENTS_FOLDER);
131        Self::remove_unfinished_segments(&segments_folder, &ids)?;
132
133        let segments = {
134            let mut map =
135                HashMap::with_capacity_and_hasher(100, xxhash_rust::xxh3::Xxh3Builder::new());
136
137            for (idx, &id) in ids.iter().enumerate() {
138                log::trace!("Recovering segment #{id:?}");
139
140                let path = segments_folder.join(id.to_string());
141                let trailer = SegmentFileTrailer::from_file(&path)?;
142
143                map.insert(
144                    id,
145                    Arc::new(Segment {
146                        id,
147                        path,
148                        meta: trailer.metadata,
149                        gc_stats: GcStats::default(),
150                        _phantom: PhantomData,
151                    }),
152                );
153
154                if idx % progress_mod == 0 {
155                    log::debug!("Recovered {idx}/{cnt} vLog segments");
156                }
157            }
158
159            map
160        };
161
162        if segments.len() < ids.len() {
163            return Err(crate::Error::Unrecoverable);
164        }
165
166        Ok(Self(Arc::new(SegmentManifestInner {
167            path: manifest_path,
168            segments: RwLock::new(segments),
169        })))
170    }
171
172    pub(crate) fn create_new<P: AsRef<Path>>(folder: P) -> crate::Result<Self> {
173        let path = folder.as_ref().join(MANIFEST_FILE);
174
175        let m = Self(Arc::new(SegmentManifestInner {
176            path,
177            segments: RwLock::new(HashMap::default()),
178        }));
179        Self::write_to_disk(&m.path, &[])?;
180
181        Ok(m)
182    }
183
184    /// Modifies the level manifest atomically.
185    pub(crate) fn atomic_swap<F: FnOnce(&mut HashMap<SegmentId, Arc<Segment<C>>>)>(
186        &self,
187        f: F,
188    ) -> crate::Result<()> {
189        let mut prev_segments = self.segments.write().expect("lock is poisoned");
190
191        // NOTE: Create a copy of the levels we can operate on
192        // without mutating the current level manifest
193        // If persisting to disk fails, this way the level manifest
194        // is unchanged
195        let mut working_copy = prev_segments.clone();
196
197        f(&mut working_copy);
198
199        let ids = working_copy.keys().copied().collect::<Vec<_>>();
200
201        Self::write_to_disk(&self.path, &ids)?;
202        *prev_segments = working_copy;
203
204        // NOTE: Lock needs to live until end of function because
205        // writing to disk needs to be exclusive
206        drop(prev_segments);
207
208        log::trace!("Swapped vLog segment list to: {ids:?}");
209
210        Ok(())
211    }
212
213    /// Drops all segments.
214    ///
215    /// This does not delete the files from disk, but just un-refs them from the manifest.
216    ///
217    /// Once this function completes, the disk files can be safely removed.
218    pub fn clear(&self) -> crate::Result<()> {
219        self.atomic_swap(|recipe| {
220            recipe.clear();
221        })
222    }
223
224    /// Drops the given segments.
225    ///
226    /// This does not delete the files from disk, but just un-refs them from the manifest.
227    ///
228    /// Once this function completes, the disk files can be safely removed.
229    pub fn drop_segments(&self, ids: &[u64]) -> crate::Result<()> {
230        self.atomic_swap(|recipe| {
231            recipe.retain(|x, _| !ids.contains(x));
232        })
233    }
234
235    pub fn register(&self, writer: MultiWriter<C>) -> crate::Result<()> {
236        let writers = writer.finish()?;
237
238        self.atomic_swap(move |recipe| {
239            for writer in writers {
240                if writer.item_count == 0 {
241                    log::debug!(
242                        "Writer at {:?} has written no data, deleting empty vLog segment file",
243                        writer.path
244                    );
245                    if let Err(e) = std::fs::remove_file(&writer.path) {
246                        log::warn!(
247                            "Could not delete empty vLog segment file at {:?}: {e:?}",
248                            writer.path
249                        );
250                    };
251                    continue;
252                }
253
254                let segment_id = writer.segment_id;
255
256                recipe.insert(
257                    segment_id,
258                    Arc::new(Segment {
259                        id: segment_id,
260                        path: writer.path,
261                        meta: Metadata {
262                            item_count: writer.item_count,
263                            compressed_bytes: writer.written_blob_bytes,
264                            total_uncompressed_bytes: writer.uncompressed_bytes,
265
266                            // NOTE: We are checking for 0 items above
267                            // so first and last key need to exist
268                            #[allow(clippy::expect_used)]
269                            key_range: KeyRange::new((
270                                writer
271                                    .first_key
272                                    .clone()
273                                    .expect("should have written at least 1 item"),
274                                writer
275                                    .last_key
276                                    .clone()
277                                    .expect("should have written at least 1 item"),
278                            )),
279                        },
280                        gc_stats: GcStats::default(),
281                        _phantom: PhantomData,
282                    }),
283                );
284
285                log::debug!(
286                    "Created segment #{segment_id:?} ({} items, {} userdata bytes)",
287                    writer.item_count,
288                    writer.uncompressed_bytes,
289                );
290            }
291        })?;
292
293        // NOTE: If we crash before before finishing the index write, it's fine
294        // because all new segments will be unreferenced, and thus can be dropped because stale
295
296        Ok(())
297    }
298
299    fn write_to_disk<P: AsRef<Path>>(path: P, segment_ids: &[SegmentId]) -> crate::Result<()> {
300        let path = path.as_ref();
301        log::trace!("Writing segment manifest to {}", path.display());
302
303        let mut bytes = Vec::new();
304
305        let cnt = segment_ids.len() as u64;
306        bytes.write_u64::<BigEndian>(cnt)?;
307
308        for id in segment_ids {
309            bytes.write_u64::<BigEndian>(*id)?;
310        }
311
312        rewrite_atomic(path, &bytes)?;
313
314        Ok(())
315    }
316
317    /// Gets a segment
318    #[must_use]
319    pub fn get_segment(&self, id: SegmentId) -> Option<Arc<Segment<C>>> {
320        self.segments
321            .read()
322            .expect("lock is poisoned")
323            .get(&id)
324            .cloned()
325    }
326
327    /// Lists all segment IDs
328    #[doc(hidden)]
329    #[must_use]
330    pub fn list_segment_ids(&self) -> Vec<SegmentId> {
331        self.segments
332            .read()
333            .expect("lock is poisoned")
334            .keys()
335            .copied()
336            .collect()
337    }
338
339    /// Lists all segments
340    #[must_use]
341    pub fn list_segments(&self) -> Vec<Arc<Segment<C>>> {
342        self.segments
343            .read()
344            .expect("lock is poisoned")
345            .values()
346            .cloned()
347            .collect()
348    }
349
350    /// Counts segments
351    #[must_use]
352    pub fn len(&self) -> usize {
353        self.segments.read().expect("lock is poisoned").len()
354    }
355
356    /// Returns the amount of bytes on disk that are occupied by blobs.
357    #[must_use]
358    pub fn disk_space_used(&self) -> u64 {
359        self.segments
360            .read()
361            .expect("lock is poisoned")
362            .values()
363            .map(|x| x.meta.compressed_bytes)
364            .sum::<u64>()
365    }
366
367    /// Returns the amount of stale bytes
368    #[must_use]
369    pub fn total_bytes(&self) -> u64 {
370        self.segments
371            .read()
372            .expect("lock is poisoned")
373            .values()
374            .map(|x| x.meta.total_uncompressed_bytes)
375            .sum::<u64>()
376    }
377
378    /// Returns the amount of stale bytes
379    #[must_use]
380    pub fn stale_bytes(&self) -> u64 {
381        self.segments
382            .read()
383            .expect("lock is poisoned")
384            .values()
385            .map(|x| x.gc_stats.stale_bytes())
386            .sum::<u64>()
387    }
388
389    /// Returns the percent of dead bytes (uncompressed) in the value log
390    #[must_use]
391    #[allow(clippy::cast_precision_loss)]
392    pub fn stale_ratio(&self) -> f32 {
393        let total_bytes = self.total_bytes();
394        if total_bytes == 0 {
395            return 0.0;
396        }
397
398        let stale_bytes = self.stale_bytes();
399
400        if stale_bytes == 0 {
401            return 0.0;
402        }
403
404        stale_bytes as f32 / total_bytes as f32
405    }
406
407    /// Returns the approximate space amplification
408    ///
409    /// Returns 0.0 if there are no items or the entire value log is stale.
410    #[must_use]
411    #[allow(clippy::cast_precision_loss)]
412    pub fn space_amp(&self) -> f32 {
413        let total_bytes = self.total_bytes();
414        if total_bytes == 0 {
415            return 0.0;
416        }
417
418        let stale_bytes = self.stale_bytes();
419
420        let alive_bytes = total_bytes - stale_bytes;
421        if alive_bytes == 0 {
422            return 0.0;
423        }
424
425        total_bytes as f32 / alive_bytes as f32
426    }
427}
428
429#[cfg(test)]
430mod tests {
431    use super::*;
432    use std::fs::File;
433    use std::io::Write;
434    use test_log::test;
435
436    #[test]
437    fn test_atomic_rewrite() -> crate::Result<()> {
438        let dir = tempfile::tempdir()?;
439
440        let path = dir.path().join("test.txt");
441        {
442            let mut file = File::create(&path)?;
443            write!(file, "asdasdasdasdasd")?;
444        }
445
446        rewrite_atomic(&path, b"newcontent")?;
447
448        let content = std::fs::read_to_string(&path)?;
449        assert_eq!("newcontent", content);
450
451        Ok(())
452    }
453}