use crate::{
blob_cache::BlobCache,
id::{IdGenerator, SegmentId},
index::Writer as IndexWriter,
manifest::{SegmentManifest, SEGMENTS_FOLDER, VLOG_MARKER},
path::absolute_path,
segment::merge::MergeReader,
value::UserValue,
version::Version,
Config, IndexReader, SegmentWriter, ValueHandle,
};
use byteorder::{BigEndian, ReadBytesExt};
use std::{
collections::BTreeMap,
fs::File,
io::{BufReader, Read, Seek},
path::PathBuf,
sync::{atomic::AtomicU64, Arc, Mutex},
};
#[allow(clippy::module_name_repetitions)]
pub type ValueLogId = u64;
pub fn get_next_vlog_id() -> ValueLogId {
static VLOG_ID_COUNTER: AtomicU64 = AtomicU64::new(0);
VLOG_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
}
#[derive(Clone)]
pub struct ValueLog(Arc<ValueLogInner>);
impl std::ops::Deref for ValueLog {
type Target = ValueLogInner;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[allow(clippy::module_name_repetitions)]
pub struct ValueLogInner {
id: u64,
path: PathBuf,
config: Config,
blob_cache: Arc<BlobCache>,
#[doc(hidden)]
pub manifest: SegmentManifest,
id_generator: IdGenerator,
rollover_guard: Mutex<()>,
}
impl ValueLog {
pub fn open<P: Into<PathBuf>>(
path: P, config: Config,
) -> crate::Result<Self> {
let path = path.into();
if path.join(VLOG_MARKER).try_exists()? {
Self::recover(path, config)
} else {
Self::create_new(path, config)
}
}
pub(crate) fn create_new<P: Into<PathBuf>>(path: P, config: Config) -> crate::Result<Self> {
let path = absolute_path(path.into());
log::trace!("Creating value-log at {}", path.display());
std::fs::create_dir_all(&path)?;
let marker_path = path.join(VLOG_MARKER);
assert!(!marker_path.try_exists()?);
std::fs::create_dir_all(path.join(SEGMENTS_FOLDER))?;
let mut file = std::fs::File::create(marker_path)?;
Version::V1.write_file_header(&mut file)?;
file.sync_all()?;
#[cfg(not(target_os = "windows"))]
{
let folder = std::fs::File::open(path.join(SEGMENTS_FOLDER))?;
folder.sync_all()?;
let folder = std::fs::File::open(&path)?;
folder.sync_all()?;
}
let blob_cache = config.blob_cache.clone();
let manifest = SegmentManifest::create_new(&path)?;
Ok(Self(Arc::new(ValueLogInner {
id: get_next_vlog_id(),
config,
path,
blob_cache,
manifest,
id_generator: IdGenerator::default(),
rollover_guard: Mutex::new(()),
})))
}
pub(crate) fn recover<P: Into<PathBuf>>(path: P, config: Config) -> crate::Result<Self> {
let path = path.into();
log::info!("Recovering vLog at {}", path.display());
{
let bytes = std::fs::read(path.join(VLOG_MARKER))?;
if let Some(version) = Version::parse_file_header(&bytes) {
if version != Version::V1 {
return Err(crate::Error::InvalidVersion(Some(version)));
}
} else {
return Err(crate::Error::InvalidVersion(None));
}
}
let blob_cache = config.blob_cache.clone();
let manifest = SegmentManifest::recover(&path)?;
let highest_id = manifest
.segments
.read()
.expect("lock is poisoned")
.values()
.map(|x| x.id)
.max()
.unwrap_or_default();
Ok(Self(Arc::new(ValueLogInner {
id: get_next_vlog_id(),
config,
path,
blob_cache,
manifest,
id_generator: IdGenerator::new(highest_id + 1),
rollover_guard: Mutex::new(()),
})))
}
pub fn register_writer<W: IndexWriter>(&self, writer: SegmentWriter<W>) -> crate::Result<()> {
self.manifest.register(writer)
}
#[must_use]
pub fn segment_count(&self) -> usize {
self.manifest.len()
}
pub fn get(&self, handle: &ValueHandle) -> crate::Result<Option<UserValue>> {
let Some(segment) = self.manifest.get_segment(handle.segment_id) else {
return Ok(None);
};
if let Some(value) = self.blob_cache.get(&((self.id, handle.clone()).into())) {
return Ok(Some(value));
}
let mut reader = BufReader::new(File::open(&segment.path)?);
reader.seek(std::io::SeekFrom::Start(handle.offset))?;
let _crc = reader.read_u32::<BigEndian>()?;
let val_len = reader.read_u32::<BigEndian>()?;
let mut value = vec![0; val_len as usize];
reader.read_exact(&mut value)?;
let value = match segment.meta.compression {
crate::CompressionType::None => value,
#[cfg(feature = "lz4")]
crate::CompressionType::Lz4 => lz4_flex::decompress_size_prepended(&value)
.map_err(|_| crate::Error::Decompress(segment.meta.compression))?,
#[cfg(feature = "miniz")]
crate::CompressionType::Miniz(_) => miniz_oxide::inflate::decompress_to_vec(&value)
.map_err(|_| crate::Error::Decompress(segment.meta.compression))?,
};
let val: UserValue = value.into();
self.blob_cache
.insert((self.id, handle.clone()).into(), val.clone());
Ok(Some(val))
}
pub fn get_writer<W: IndexWriter>(&self, index_writer: W) -> crate::Result<SegmentWriter<W>> {
Ok(SegmentWriter::new(
self.id_generator.clone(),
self.config.segment_size_bytes,
self.path.join(SEGMENTS_FOLDER),
index_writer,
self.config.compression,
)?)
}
#[must_use]
pub fn select_segments_for_space_amp_reduction(&self, space_amp_target: f32) -> Vec<SegmentId> {
let current_space_amp = self.space_amp();
if current_space_amp < space_amp_target {
log::trace!("Space amp is <= target {space_amp_target}, nothing to do");
vec![]
} else {
log::debug!("Selecting segments to GC, space_amp_target={space_amp_target}");
let lock = self.manifest.segments.read().expect("lock is poisoned");
let mut segments = lock.values().collect::<Vec<_>>();
segments.sort_by(|a, b| {
b.stale_ratio()
.partial_cmp(&a.stale_ratio())
.unwrap_or(std::cmp::Ordering::Equal)
});
let mut selection = vec![];
let mut total_bytes = self.manifest.total_bytes();
let mut stale_bytes = self.manifest.stale_bytes();
for segment in segments {
let segment_stale_bytes = segment.gc_stats.stale_bytes();
stale_bytes -= segment_stale_bytes;
total_bytes -= segment_stale_bytes;
selection.push(segment.id);
let space_amp_after_gc =
total_bytes as f32 / (total_bytes as f32 - stale_bytes as f32);
log::debug!(
"Selected segment #{} for GC: will reduce space amp to {space_amp_after_gc}",
segment.id
);
if space_amp_after_gc <= space_amp_target {
break;
}
}
selection
}
}
#[must_use]
pub fn find_segments_with_stale_threshold(&self, threshold: f32) -> Vec<SegmentId> {
self.manifest
.segments
.read()
.expect("lock is poisoned")
.values()
.filter(|x| x.stale_ratio() >= threshold)
.map(|x| x.id)
.collect::<Vec<_>>()
}
pub fn drop_stale_segments(&self) -> crate::Result<()> {
let ids = self
.manifest
.segments
.read()
.expect("lock is poisoned")
.values()
.filter(|x| x.is_stale())
.map(|x| x.id)
.collect::<Vec<_>>();
log::debug!("Dropping blob files: {ids:?}");
self.manifest.drop_segments(&ids)?;
Ok(())
}
fn mark_as_stale(&self, ids: &[SegmentId]) {
let segments = self.manifest.segments.read().expect("lock is poisoned");
for id in ids {
let Some(segment) = segments.get(id) else {
continue;
};
segment.mark_as_stale();
}
}
#[must_use]
pub fn space_amp(&self) -> f32 {
self.manifest.space_amp()
}
#[allow(clippy::result_unit_err)]
pub fn scan_for_stats(
&self,
iter: impl Iterator<Item = std::io::Result<(ValueHandle, u32)>>,
) -> crate::Result<()> {
struct SegmentCounter {
size: u64,
item_count: u64,
}
let _guard = self.rollover_guard.lock().expect("lock is poisoned");
log::info!("--- GC report for vLog @ {:?} ---", self.path);
let mut size_map = BTreeMap::<SegmentId, SegmentCounter>::new();
for handle in iter {
let (handle, size) = handle.map_err(|_| {
crate::Error::Io(std::io::Error::new(
std::io::ErrorKind::Other,
"Index returned error",
))
})?;
let size = u64::from(size);
size_map
.entry(handle.segment_id)
.and_modify(|x| {
x.item_count += 1;
x.size += size;
})
.or_insert_with(|| SegmentCounter {
size,
item_count: 1,
});
}
for (&id, counter) in &size_map {
let used_size = counter.size;
let alive_item_count = counter.item_count;
let segment = self.manifest.get_segment(id).expect("segment should exist");
let total_bytes = segment.meta.total_uncompressed_bytes;
let stale_bytes = total_bytes - used_size;
let total_items = segment.meta.item_count;
let stale_items = total_items - alive_item_count;
let space_amp = total_bytes as f64 / used_size as f64;
let stale_ratio = stale_bytes as f64 / total_bytes as f64;
log::info!(
"Blob file #{id} has {}/{} stale MiB ({:.1}% stale, {stale_items}/{total_items} items) - space amp: {space_amp})",
stale_bytes / 1_024 / 1_024,
total_bytes / 1_024 / 1_024,
stale_ratio * 100.0
);
segment.gc_stats.set_stale_bytes(stale_bytes);
segment.gc_stats.set_stale_items(stale_items);
}
for id in self
.manifest
.segments
.read()
.expect("lock is poisoned")
.keys()
{
let segment = self
.manifest
.get_segment(*id)
.expect("segment should exist");
if !size_map.contains_key(id) {
log::info!(
"Blob file #{id} has no incoming references - can be dropped, freeing {} KiB on disk (userdata={} MiB)",
segment.meta.compressed_bytes / 1_024,
segment.meta.total_uncompressed_bytes / 1_024/ 1_024
);
self.mark_as_stale(&[*id]);
}
}
log::info!("Total bytes: {}", self.manifest.total_bytes());
log::info!("Stale bytes: {}", self.manifest.stale_bytes());
log::info!("Space amp: {}", self.space_amp());
log::info!("--- GC report done ---");
Ok(())
}
#[doc(hidden)]
pub fn get_reader(&self) -> std::io::Result<MergeReader> {
let segments = self.manifest.segments.read().expect("lock is poisoned");
let readers = segments
.values()
.map(|x| x.scan())
.collect::<std::io::Result<Vec<_>>>()?;
Ok(MergeReader::new(readers))
}
#[doc(hidden)]
pub fn major_compact<R: IndexReader, W: IndexWriter>(
&self,
index_reader: &R,
index_writer: W,
) -> crate::Result<()> {
let ids = self.manifest.list_segment_ids();
self.rollover(&ids, index_reader, index_writer)
}
pub fn rollover<R: IndexReader, W: IndexWriter>(
&self,
ids: &[u64],
index_reader: &R,
index_writer: W,
) -> crate::Result<()> {
if ids.is_empty() {
return Ok(());
}
let _guard = self.rollover_guard.lock().expect("lock is poisoned");
log::info!("Rollover segments {ids:?}");
let segments = ids
.iter()
.map(|&x| self.manifest.get_segment(x))
.collect::<Option<Vec<_>>>();
let Some(segments) = segments else {
return Ok(());
};
let readers = segments
.into_iter()
.map(|x| x.scan())
.collect::<std::io::Result<Vec<_>>>()?;
let reader = MergeReader::new(readers);
let mut writer = self.get_writer(index_writer)?;
for item in reader {
let (k, v, segment_id) = item?;
match index_reader.get(&k)? {
Some(x) if segment_id < x.segment_id => continue,
None => continue,
_ => {}
}
writer.write(&k, &v)?;
}
self.manifest.register(writer)?;
self.mark_as_stale(ids);
Ok(())
}
}