mod blob_file_list;
mod optimize;
mod persist;
pub mod recovery;
pub mod run;
mod super_version;
pub use blob_file_list::BlobFileList;
pub use persist::persist_version;
pub use run::Run;
pub use super_version::{SuperVersion, SuperVersions};
use crate::TreeType;
use crate::blob_tree::{FragmentationEntry, FragmentationMap};
use crate::checksum::ChecksumType;
use crate::coding::Encode;
use crate::compaction::state::hidden_set::HiddenSet;
use crate::version::recovery::Recovery;
use crate::{
HashSet, KeyRange, Table, TableId,
comparator::UserComparator,
vlog::{BlobFile, BlobFileId},
};
use optimize::optimize_runs;
use run::Ranged;
use std::{ops::Deref, sync::Arc};
pub struct TransformContext<'a> {
comparator: &'a dyn UserComparator,
}
impl<'a> TransformContext<'a> {
pub fn new(comparator: &'a dyn UserComparator) -> Self {
Self { comparator }
}
pub fn comparator(&self) -> &'a dyn UserComparator {
self.comparator
}
}
pub const DEFAULT_LEVEL_COUNT: u8 = 7;
pub type VersionId = u64;
impl Ranged for Table {
fn key_range(&self) -> &KeyRange {
&self.metadata.key_range
}
}
pub struct GenericLevel<T: Ranged> {
runs: Vec<Arc<Run<T>>>,
}
impl<T: Ranged> std::ops::Deref for GenericLevel<T> {
type Target = [Arc<Run<T>>];
fn deref(&self) -> &Self::Target {
&self.runs
}
}
impl<T: Ranged> GenericLevel<T> {
pub fn new(runs: Vec<Arc<Run<T>>>) -> Self {
Self { runs }
}
pub fn table_count(&self) -> usize {
self.iter().map(|x| x.len()).sum()
}
pub fn run_count(&self) -> usize {
self.runs.len()
}
pub fn is_disjoint(&self) -> bool {
self.run_count() == 1
}
pub fn is_empty(&self) -> bool {
self.runs.is_empty()
}
pub fn iter(&self) -> impl DoubleEndedIterator<Item = &Arc<Run<T>>> {
self.runs.iter()
}
}
#[derive(Clone)]
pub struct Level(Arc<GenericLevel<Table>>);
impl std::ops::Deref for Level {
type Target = GenericLevel<Table>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Level {
pub fn empty() -> Self {
Self::from_runs(vec![])
}
pub fn from_runs(runs: Vec<Arc<Run<Table>>>) -> Self {
Self(Arc::new(GenericLevel { runs }))
}
pub fn list_ids(&self) -> HashSet<TableId> {
self.iter()
.flat_map(|run| run.iter())
.map(Table::id)
.collect()
}
pub fn first_run(&self) -> Option<&Arc<Run<Table>>> {
self.runs.first()
}
pub fn size(&self) -> u64 {
self.0
.iter()
.flat_map(|x| x.iter())
.map(Table::file_size)
.sum()
}
pub fn aggregate_key_range(&self) -> KeyRange {
if self.run_count() == 1 {
#[expect(
clippy::expect_used,
reason = "we check for run_count, so the first run must exist"
)]
self.runs
.first()
.expect("should exist")
.aggregate_key_range()
} else {
let key_ranges = self
.iter()
.map(|x| Run::aggregate_key_range(x))
.collect::<Vec<_>>();
KeyRange::aggregate(key_ranges.iter())
}
}
pub fn aggregate_key_range_cmp(&self, cmp: &dyn crate::comparator::UserComparator) -> KeyRange {
if self.run_count() == 1 {
#[expect(
clippy::expect_used,
reason = "we check for run_count, so the first run must exist"
)]
self.runs
.first()
.expect("should exist")
.aggregate_key_range()
} else {
let key_ranges = self
.iter()
.map(|x| Run::aggregate_key_range(x))
.collect::<Vec<_>>();
KeyRange::aggregate_cmp(key_ranges.iter(), cmp)
}
}
}
pub struct VersionInner {
id: VersionId,
tree_type: TreeType,
levels: Vec<Level>,
#[doc(hidden)]
pub blob_files: Arc<BlobFileList>,
gc_stats: Arc<FragmentationMap>,
}
#[derive(Clone)]
pub struct Version {
inner: Arc<VersionInner>,
}
impl std::ops::Deref for Version {
type Target = VersionInner;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl Version {
pub fn tree_type(&self) -> TreeType {
self.tree_type
}
pub fn id(&self) -> VersionId {
self.id
}
pub fn gc_stats(&self) -> &FragmentationMap {
&self.gc_stats
}
pub fn l0(&self) -> &Level {
#[expect(clippy::expect_used)]
self.levels.first().expect("L0 should exist")
}
#[must_use]
pub fn level_is_busy(&self, idx: usize, hidden_set: &HiddenSet) -> bool {
self.level(idx).is_some_and(|level| {
level
.iter()
.flat_map(|run| run.iter())
.any(|table| hidden_set.is_hidden(table.id()))
})
}
pub fn new(id: VersionId, tree_type: TreeType) -> Self {
let levels = (0..DEFAULT_LEVEL_COUNT).map(|_| Level::empty()).collect();
Self {
inner: Arc::new(VersionInner {
id,
tree_type,
levels,
blob_files: Arc::default(),
gc_stats: Arc::default(),
}),
}
}
pub(crate) fn from_recovery(
recovery: Recovery,
tables: &[Table],
blob_files: &[BlobFile],
) -> crate::Result<Self> {
let version_levels = recovery
.table_ids
.iter()
.map(|level| {
let level_runs = level
.iter()
.map(|run| {
let run_tables = run
.iter()
.map(|table| {
tables
.iter()
.find(|x| x.id() == table.id)
.cloned()
.ok_or(crate::Error::Unrecoverable)
})
.collect::<crate::Result<Vec<_>>>()?;
Ok(Arc::new(
#[expect(
clippy::expect_used,
reason = "empty runs should not exist, so there should not be any empty persisted runs"
)]
Run::new(run_tables).expect("persisted runs should not be empty"),
))
})
.collect::<crate::Result<Vec<_>>>()?;
Ok(Level::from_runs(level_runs))
})
.collect::<crate::Result<Vec<_>>>()?;
Ok(Self::from_levels(
recovery.curr_version_id,
recovery.tree_type,
version_levels,
BlobFileList::new(blob_files.iter().cloned().map(|bf| (bf.id(), bf)).collect()),
recovery.gc_stats,
))
}
pub fn from_levels(
id: VersionId,
tree_type: TreeType,
levels: Vec<Level>,
blob_files: BlobFileList,
gc_stats: FragmentationMap,
) -> Self {
Self {
inner: Arc::new(VersionInner {
id,
tree_type,
levels,
blob_files: Arc::new(blob_files),
gc_stats: Arc::new(gc_stats),
}),
}
}
pub fn level_count(&self) -> usize {
self.levels.len()
}
pub fn iter_levels(&self) -> impl Iterator<Item = &Level> {
self.levels.iter()
}
pub fn table_count(&self) -> usize {
self.iter_levels().map(|x| x.table_count()).sum()
}
pub fn blob_file_count(&self) -> usize {
self.blob_files.len()
}
pub fn iter_tables(&self) -> impl Iterator<Item = &Table> {
self.levels
.iter()
.flat_map(|x| x.iter())
.flat_map(|x| x.iter())
}
pub(crate) fn get_table(&self, id: TableId) -> Option<&Table> {
self.iter_tables().find(|x| x.metadata.id == id)
}
pub fn level(&self, n: usize) -> Option<&Level> {
self.levels.get(n)
}
pub fn with_new_l0_run(
&self,
run: &[Table],
blob_files: Option<&[BlobFile]>,
diff: Option<FragmentationMap>,
ctx: &TransformContext<'_>,
) -> Self {
let comparator = ctx.comparator;
let id = self.id + 1;
let mut levels = vec![];
levels.push({
#[expect(clippy::expect_used, reason = "L0 always exists")]
let l0 = self.levels.first().expect("L0 should always exist");
let prev_runs = l0
.runs
.iter()
.map(|run| {
let run: Run<_> = run.deref().clone();
run
})
.collect::<Vec<_>>();
let mut runs = Vec::with_capacity(prev_runs.len() + run.len());
runs.extend(run.iter().cloned().map(|table| {
let Some(run) = Run::new(vec![table]) else {
unreachable!("single-table run should never be empty");
};
run
}));
runs.extend(prev_runs);
let runs = optimize_runs(runs, comparator);
Level::from_runs(runs.into_iter().map(Arc::new).collect())
});
levels.extend(self.levels.iter().skip(1).cloned());
let value_log = if let Some(blob_files) = blob_files {
let mut copy = self.blob_files.deref().clone();
copy.extend(blob_files.iter().cloned().map(|bf| (bf.id(), bf)));
copy.into()
} else {
self.blob_files.clone()
};
let gc_stats = if let Some(diff) = diff {
let mut copy = self.gc_stats.deref().clone();
diff.merge_into(&mut copy);
copy.prune(&value_log);
Arc::new(copy)
} else {
self.gc_stats.clone()
};
Self {
inner: Arc::new(VersionInner {
id,
tree_type: self.tree_type,
levels,
blob_files: value_log,
gc_stats,
}),
}
}
pub fn with_dropped(
&self,
ids: &[TableId],
dropped_blob_files: &mut Vec<BlobFile>,
ctx: &TransformContext<'_>,
) -> crate::Result<Self> {
let comparator = ctx.comparator;
let id = self.id + 1;
let mut levels = vec![];
let mut dropped_tables: Vec<Table> = vec![];
for level in &self.levels {
let runs = level
.runs
.iter()
.map(|run| {
let mut run: Run<_> = run.deref().clone();
let removed_tables = run
.inner_mut()
.extract_if(.., |x| ids.contains(&x.metadata.id));
dropped_tables.extend(removed_tables);
run
})
.filter(|x| !x.is_empty())
.collect::<Vec<_>>();
let runs = optimize_runs(runs, comparator);
levels.push(Level::from_runs(runs.into_iter().map(Arc::new).collect()));
}
let gc_stats = if dropped_tables.is_empty() {
self.gc_stats.clone()
} else {
let mut copy = self.gc_stats.deref().clone();
for table in &dropped_tables {
let linked_blob_files = table.list_blob_file_references()?.unwrap_or_default();
for blob_file in linked_blob_files {
copy.entry(blob_file.blob_file_id)
.and_modify(|counter| {
counter.bytes += blob_file.bytes;
counter.len += blob_file.len;
})
.or_insert_with(|| {
FragmentationEntry::new(
blob_file.len,
blob_file.bytes,
blob_file.on_disk_bytes,
)
});
}
}
Arc::new(copy)
};
let value_log = if dropped_tables.is_empty() {
self.blob_files.clone()
} else {
let mut copy = self.blob_files.deref().clone();
dropped_blob_files.extend(copy.prune_dead(&gc_stats));
Arc::new(copy)
};
Ok(Self {
inner: Arc::new(VersionInner {
id,
tree_type: self.tree_type,
levels,
blob_files: value_log,
gc_stats,
}),
})
}
#[expect(
clippy::too_many_arguments,
reason = "merge requires blob/GC params alongside context; further bundling planned"
)]
pub fn with_merge(
&self,
old_ids: &[TableId],
new_tables: &[Table],
dest_level: usize,
diff: Option<FragmentationMap>,
new_blob_files: Vec<BlobFile>,
blob_files_to_drop: &HashSet<BlobFileId>,
ctx: &TransformContext<'_>,
) -> Self {
let comparator = ctx.comparator;
let id = self.id + 1;
let mut levels = vec![];
for (level_idx, level) in self.levels.iter().enumerate() {
let mut runs = level
.runs
.iter()
.map(|run| {
let mut run: Run<_> = run.deref().clone();
run.retain(|x| !old_ids.contains(&x.metadata.id));
run
})
.filter(|x| !x.is_empty())
.collect::<Vec<_>>();
if level_idx == dest_level
&& let Some(run) = Run::new(new_tables.to_vec())
{
if dest_level == 0 {
runs.push(run);
} else {
runs.insert(0, run);
}
}
let runs = optimize_runs(runs, comparator);
levels.push(Level::from_runs(runs.into_iter().map(Arc::new).collect()));
}
let has_diff = diff.is_some();
let value_log = if has_diff || !new_blob_files.is_empty() || !blob_files_to_drop.is_empty()
{
let mut copy = self.blob_files.deref().clone();
for blob_file in new_blob_files {
copy.insert(blob_file.id(), blob_file);
}
for &id in blob_files_to_drop {
copy.remove(id);
}
Arc::new(copy)
} else {
self.blob_files.clone()
};
let gc_stats = if has_diff || !blob_files_to_drop.is_empty() {
let mut copy = self.gc_stats.deref().clone();
if let Some(diff) = diff {
diff.merge_into(&mut copy);
}
copy.prune(&value_log);
Arc::new(copy)
} else {
self.gc_stats.clone()
};
Self {
inner: Arc::new(VersionInner {
id,
tree_type: self.tree_type,
levels,
blob_files: value_log,
gc_stats,
}),
}
}
pub fn with_moved(
&self,
ids: &[TableId],
dest_level: usize,
ctx: &TransformContext<'_>,
) -> Self {
let comparator = ctx.comparator;
let id = self.id + 1;
let affected_tables = self
.iter_tables()
.filter(|x| ids.contains(&x.id()))
.cloned()
.collect::<Vec<_>>();
assert_eq!(affected_tables.len(), ids.len(), "invalid table IDs");
let mut levels = vec![];
for (level_idx, level) in self.levels.iter().enumerate() {
let mut runs = level
.runs
.iter()
.map(|run| {
let mut run: Run<_> = run.deref().clone();
run.retain(|x| !ids.contains(&x.metadata.id));
run
})
.filter(|x| !x.is_empty())
.collect::<Vec<_>>();
if level_idx == dest_level
&& let Some(run) = Run::new(affected_tables.clone())
{
runs.insert(0, run);
}
let runs = optimize_runs(runs, comparator);
levels.push(Level::from_runs(runs.into_iter().map(Arc::new).collect()));
}
Self {
inner: Arc::new(VersionInner {
id,
tree_type: self.tree_type,
levels,
blob_files: self.blob_files.clone(),
gc_stats: self.gc_stats.clone(),
}),
}
}
}
impl Version {
pub(crate) fn encode_into(
&self,
writer: &mut sfa::Writer<impl std::io::Write + std::io::Seek>,
comparator_name: &str,
) -> Result<(), crate::Error> {
use crate::FormatVersion;
use byteorder::{LittleEndian, WriteBytesExt};
use std::io::Write;
writer.start("format_version")?;
writer.write_u8(FormatVersion::V4.into())?;
writer.start("crate_version")?;
writer.write_all(env!("CARGO_PKG_VERSION").as_bytes())?;
writer.start("tree_type")?;
writer.write_u8(self.tree_type.into())?;
writer.start("level_count")?;
#[expect(
clippy::cast_possible_truncation,
reason = "level count is bounded by 255"
)]
writer.write_u8(self.level_count() as u8)?;
writer.start("filter_hash_type")?;
writer.write_u8(u8::from(ChecksumType::Xxh3))?;
writer.start("comparator_name")?;
writer.write_all(comparator_name.as_bytes())?;
writer.start("tables")?;
#[expect(
clippy::cast_possible_truncation,
reason = "there are always less than 256 levels"
)]
writer.write_u8(self.level_count() as u8)?;
for level in self.iter_levels() {
#[expect(
clippy::cast_possible_truncation,
reason = "there are always less than 256 runs"
)]
writer.write_u8(level.len() as u8)?;
for run in level.iter() {
#[expect(
clippy::cast_possible_truncation,
reason = "there are always less than 4 billion tables in a run"
)]
writer.write_u32::<LittleEndian>(run.len() as u32)?;
for table in run.iter() {
writer.write_u64::<LittleEndian>(table.id())?;
writer.write_u8(0)?; writer.write_u128::<LittleEndian>(table.checksum().into_u128())?;
writer.write_u64::<LittleEndian>(table.global_seqno())?;
}
}
}
writer.start("blob_files")?;
#[expect(
clippy::cast_possible_truncation,
reason = "there are always less than 4 billion blob files"
)]
writer.write_u32::<LittleEndian>(self.blob_files.len() as u32)?;
for file in self.blob_files.iter() {
writer.write_u64::<LittleEndian>(file.id())?;
writer.write_u8(0)?; writer.write_u128::<LittleEndian>(file.0.checksum.into_u128())?;
}
writer.start("blob_gc_stats")?;
self.gc_stats.encode_into(writer)?;
Ok(())
}
}