1use crate::{
6 id::SegmentId,
7 key_range::KeyRange,
8 segment::{gc_stats::GcStats, meta::Metadata, trailer::SegmentFileTrailer},
9 Compressor, HashMap, Segment, SegmentWriter as MultiWriter,
10};
11use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
12use std::{
13 io::{Cursor, Write},
14 marker::PhantomData,
15 path::{Path, PathBuf},
16 sync::{Arc, RwLock},
17};
18
19pub const VLOG_MARKER: &str = ".vlog";
20pub const SEGMENTS_FOLDER: &str = "segments";
21const MANIFEST_FILE: &str = "vlog_manifest";
22
23fn rewrite_atomic<P: AsRef<Path>>(path: P, content: &[u8]) -> std::io::Result<()> {
25 let path = path.as_ref();
26 let folder = path.parent().expect("should have a parent");
27
28 let mut temp_file = tempfile::NamedTempFile::new_in(folder)?;
29 temp_file.write_all(content)?;
30 temp_file.persist(path)?;
31
32 #[cfg(not(target_os = "windows"))]
33 {
34 let file = std::fs::File::open(path)?;
37 file.sync_all()?;
38 }
39
40 Ok(())
41}
42
43#[allow(clippy::module_name_repetitions)]
44pub struct SegmentManifestInner<C: Compressor + Clone> {
45 path: PathBuf,
46 pub segments: RwLock<HashMap<SegmentId, Arc<Segment<C>>>>,
47}
48
49#[allow(clippy::module_name_repetitions)]
50#[derive(Clone)]
51pub struct SegmentManifest<C: Compressor + Clone>(Arc<SegmentManifestInner<C>>);
52
53impl<C: Compressor + Clone> std::ops::Deref for SegmentManifest<C> {
54 type Target = SegmentManifestInner<C>;
55
56 fn deref(&self) -> &Self::Target {
57 &self.0
58 }
59}
60
61impl<C: Compressor + Clone> SegmentManifest<C> {
62 fn remove_unfinished_segments<P: AsRef<Path>>(
63 folder: P,
64 registered_ids: &[u64],
65 ) -> crate::Result<()> {
66 for dirent in std::fs::read_dir(folder)? {
67 let dirent = dirent?;
68
69 if dirent.file_name() == ".DS_Store" {
71 continue;
72 }
73
74 if dirent.file_type()?.is_file() {
75 let segment_id = dirent
76 .file_name()
77 .to_str()
78 .expect("should be valid utf-8")
79 .parse::<u64>()
80 .expect("should be valid segment ID");
81
82 if !registered_ids.contains(&segment_id) {
83 log::trace!("Deleting unfinished vLog segment {segment_id}");
84 std::fs::remove_file(dirent.path())?;
85 }
86 }
87 }
88
89 Ok(())
90 }
91
92 fn load_ids_from_disk<P: AsRef<Path>>(path: P) -> crate::Result<Vec<SegmentId>> {
94 let path = path.as_ref();
95 log::debug!("Loading manifest from {}", path.display());
96
97 let bytes = std::fs::read(path)?;
98
99 let mut ids = vec![];
100
101 let mut cursor = Cursor::new(bytes);
102
103 let cnt = cursor.read_u64::<BigEndian>()?;
104
105 for _ in 0..cnt {
106 ids.push(cursor.read_u64::<BigEndian>()?);
107 }
108
109 Ok(ids)
110 }
111
112 pub(crate) fn recover<P: AsRef<Path>>(folder: P) -> crate::Result<Self> {
114 let folder = folder.as_ref();
115 let manifest_path = folder.join(MANIFEST_FILE);
116
117 log::info!("Recovering vLog at {folder:?}");
118
119 let ids = Self::load_ids_from_disk(&manifest_path)?;
120 let cnt = ids.len();
121
122 let progress_mod = match cnt {
123 _ if cnt <= 20 => 1,
124 _ if cnt <= 100 => 10,
125 _ => 100,
126 };
127
128 log::debug!("Recovering {cnt} vLog segments from {folder:?}");
129
130 let segments_folder = folder.join(SEGMENTS_FOLDER);
131 Self::remove_unfinished_segments(&segments_folder, &ids)?;
132
133 let segments = {
134 let mut map =
135 HashMap::with_capacity_and_hasher(100, xxhash_rust::xxh3::Xxh3Builder::new());
136
137 for (idx, &id) in ids.iter().enumerate() {
138 log::trace!("Recovering segment #{id:?}");
139
140 let path = segments_folder.join(id.to_string());
141 let trailer = SegmentFileTrailer::from_file(&path)?;
142
143 map.insert(
144 id,
145 Arc::new(Segment {
146 id,
147 path,
148 meta: trailer.metadata,
149 gc_stats: GcStats::default(),
150 _phantom: PhantomData,
151 }),
152 );
153
154 if idx % progress_mod == 0 {
155 log::debug!("Recovered {idx}/{cnt} vLog segments");
156 }
157 }
158
159 map
160 };
161
162 if segments.len() < ids.len() {
163 return Err(crate::Error::Unrecoverable);
164 }
165
166 Ok(Self(Arc::new(SegmentManifestInner {
167 path: manifest_path,
168 segments: RwLock::new(segments),
169 })))
170 }
171
172 pub(crate) fn create_new<P: AsRef<Path>>(folder: P) -> crate::Result<Self> {
173 let path = folder.as_ref().join(MANIFEST_FILE);
174
175 let m = Self(Arc::new(SegmentManifestInner {
176 path,
177 segments: RwLock::new(HashMap::default()),
178 }));
179 Self::write_to_disk(&m.path, &[])?;
180
181 Ok(m)
182 }
183
184 pub(crate) fn atomic_swap<F: FnOnce(&mut HashMap<SegmentId, Arc<Segment<C>>>)>(
186 &self,
187 f: F,
188 ) -> crate::Result<()> {
189 let mut prev_segments = self.segments.write().expect("lock is poisoned");
190
191 let mut working_copy = prev_segments.clone();
196
197 f(&mut working_copy);
198
199 let ids = working_copy.keys().copied().collect::<Vec<_>>();
200
201 Self::write_to_disk(&self.path, &ids)?;
202 *prev_segments = working_copy;
203
204 drop(prev_segments);
207
208 log::trace!("Swapped vLog segment list to: {ids:?}");
209
210 Ok(())
211 }
212
213 pub fn clear(&self) -> crate::Result<()> {
219 self.atomic_swap(|recipe| {
220 recipe.clear();
221 })
222 }
223
224 pub fn drop_segments(&self, ids: &[u64]) -> crate::Result<()> {
230 self.atomic_swap(|recipe| {
231 recipe.retain(|x, _| !ids.contains(x));
232 })
233 }
234
235 pub fn register(&self, writer: MultiWriter<C>) -> crate::Result<()> {
236 let writers = writer.finish()?;
237
238 self.atomic_swap(move |recipe| {
239 for writer in writers {
240 if writer.item_count == 0 {
241 log::debug!(
242 "Writer at {:?} has written no data, deleting empty vLog segment file",
243 writer.path
244 );
245 if let Err(e) = std::fs::remove_file(&writer.path) {
246 log::warn!(
247 "Could not delete empty vLog segment file at {:?}: {e:?}",
248 writer.path
249 );
250 };
251 continue;
252 }
253
254 let segment_id = writer.segment_id;
255
256 recipe.insert(
257 segment_id,
258 Arc::new(Segment {
259 id: segment_id,
260 path: writer.path,
261 meta: Metadata {
262 item_count: writer.item_count,
263 compressed_bytes: writer.written_blob_bytes,
264 total_uncompressed_bytes: writer.uncompressed_bytes,
265
266 #[allow(clippy::expect_used)]
269 key_range: KeyRange::new((
270 writer
271 .first_key
272 .clone()
273 .expect("should have written at least 1 item"),
274 writer
275 .last_key
276 .clone()
277 .expect("should have written at least 1 item"),
278 )),
279 },
280 gc_stats: GcStats::default(),
281 _phantom: PhantomData,
282 }),
283 );
284
285 log::debug!(
286 "Created segment #{segment_id:?} ({} items, {} userdata bytes)",
287 writer.item_count,
288 writer.uncompressed_bytes,
289 );
290 }
291 })?;
292
293 Ok(())
297 }
298
299 fn write_to_disk<P: AsRef<Path>>(path: P, segment_ids: &[SegmentId]) -> crate::Result<()> {
300 let path = path.as_ref();
301 log::trace!("Writing segment manifest to {}", path.display());
302
303 let mut bytes = Vec::new();
304
305 let cnt = segment_ids.len() as u64;
306 bytes.write_u64::<BigEndian>(cnt)?;
307
308 for id in segment_ids {
309 bytes.write_u64::<BigEndian>(*id)?;
310 }
311
312 rewrite_atomic(path, &bytes)?;
313
314 Ok(())
315 }
316
317 #[must_use]
319 pub fn get_segment(&self, id: SegmentId) -> Option<Arc<Segment<C>>> {
320 self.segments
321 .read()
322 .expect("lock is poisoned")
323 .get(&id)
324 .cloned()
325 }
326
327 #[doc(hidden)]
329 #[must_use]
330 pub fn list_segment_ids(&self) -> Vec<SegmentId> {
331 self.segments
332 .read()
333 .expect("lock is poisoned")
334 .keys()
335 .copied()
336 .collect()
337 }
338
339 #[must_use]
341 pub fn list_segments(&self) -> Vec<Arc<Segment<C>>> {
342 self.segments
343 .read()
344 .expect("lock is poisoned")
345 .values()
346 .cloned()
347 .collect()
348 }
349
350 #[must_use]
352 pub fn len(&self) -> usize {
353 self.segments.read().expect("lock is poisoned").len()
354 }
355
356 #[must_use]
358 pub fn disk_space_used(&self) -> u64 {
359 self.segments
360 .read()
361 .expect("lock is poisoned")
362 .values()
363 .map(|x| x.meta.compressed_bytes)
364 .sum::<u64>()
365 }
366
367 #[must_use]
369 pub fn total_bytes(&self) -> u64 {
370 self.segments
371 .read()
372 .expect("lock is poisoned")
373 .values()
374 .map(|x| x.meta.total_uncompressed_bytes)
375 .sum::<u64>()
376 }
377
378 #[must_use]
380 pub fn stale_bytes(&self) -> u64 {
381 self.segments
382 .read()
383 .expect("lock is poisoned")
384 .values()
385 .map(|x| x.gc_stats.stale_bytes())
386 .sum::<u64>()
387 }
388
389 #[must_use]
391 #[allow(clippy::cast_precision_loss)]
392 pub fn stale_ratio(&self) -> f32 {
393 let total_bytes = self.total_bytes();
394 if total_bytes == 0 {
395 return 0.0;
396 }
397
398 let stale_bytes = self.stale_bytes();
399
400 if stale_bytes == 0 {
401 return 0.0;
402 }
403
404 stale_bytes as f32 / total_bytes as f32
405 }
406
407 #[must_use]
411 #[allow(clippy::cast_precision_loss)]
412 pub fn space_amp(&self) -> f32 {
413 let total_bytes = self.total_bytes();
414 if total_bytes == 0 {
415 return 0.0;
416 }
417
418 let stale_bytes = self.stale_bytes();
419
420 let alive_bytes = total_bytes - stale_bytes;
421 if alive_bytes == 0 {
422 return 0.0;
423 }
424
425 total_bytes as f32 / alive_bytes as f32
426 }
427}
428
429#[cfg(test)]
430mod tests {
431 use super::*;
432 use std::fs::File;
433 use std::io::Write;
434 use test_log::test;
435
436 #[test]
437 fn test_atomic_rewrite() -> crate::Result<()> {
438 let dir = tempfile::tempdir()?;
439
440 let path = dir.path().join("test.txt");
441 {
442 let mut file = File::create(&path)?;
443 write!(file, "asdasdasdasdasd")?;
444 }
445
446 rewrite_atomic(&path, b"newcontent")?;
447
448 let content = std::fs::read_to_string(&path)?;
449 assert_eq!("newcontent", content);
450
451 Ok(())
452 }
453}