use std::{collections::HashMap, sync::Arc};
use crate::types::{
level::{Level, ParquetFileMeta},
schema::TableSchema,
};
use arc_swap::{ArcSwap, Guard};
#[derive(Clone, Debug)]
pub struct DataFileMeta {
pub path: String,
pub meta: ParquetFileMeta,
pub dv_path: Option<String>,
pub dv_offset: Option<i64>,
pub dv_length: Option<i64>,
}
impl DataFileMeta {
pub fn has_dv(&self) -> bool {
self.dv_path.is_some()
}
}
#[derive(Clone, Debug)]
pub struct Version {
pub snapshot_id: i64,
pub levels: HashMap<Level, Vec<DataFileMeta>>,
pub schema: Arc<TableSchema>,
}
impl Version {
pub fn empty(schema: Arc<TableSchema>) -> Self {
Self {
snapshot_id: 0,
levels: HashMap::new(),
schema,
}
}
pub fn files_at(&self, level: Level) -> &[DataFileMeta] {
self.levels.get(&level).map(|v| v.as_slice()).unwrap_or(&[])
}
pub fn l0_file_count(&self) -> usize {
self.files_at(Level(0)).len()
}
pub fn level_bytes(&self, level: Level) -> u64 {
self.files_at(level).iter().map(|f| f.meta.file_size).sum()
}
pub fn max_level(&self) -> Level {
self.levels.keys().copied().max().unwrap_or(Level(0))
}
pub fn find_file_for_key(&self, level: Level, user_key_bytes: &[u8]) -> Option<&DataFileMeta> {
let files = self.files_at(level);
if files.is_empty() {
return None;
}
let idx = files.partition_point(|f| f.meta.key_min.as_slice() <= user_key_bytes);
if idx == 0 {
return None;
}
let candidate = &files[idx - 1];
if user_key_bytes <= candidate.meta.key_max.as_slice() {
Some(candidate)
} else {
None
}
}
pub fn total_files(&self) -> usize {
self.levels.values().map(|v| v.len()).sum()
}
}
pub struct VersionSet {
current: ArcSwap<Version>,
}
impl VersionSet {
pub fn new(initial: Version) -> Self {
Self {
current: ArcSwap::from_pointee(initial),
}
}
pub fn current(&self) -> Guard<Arc<Version>> {
self.current.load()
}
pub fn install(&self, version: Version) {
self.current.store(Arc::new(version));
}
pub fn snapshot_id(&self) -> i64 {
self.current.load().snapshot_id
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{
level::{Level, ParquetFileMeta},
schema::{ColumnDef, ColumnType, TableSchema},
};
fn test_schema() -> Arc<TableSchema> {
Arc::new(TableSchema {
table_name: "test".into(),
columns: vec![
ColumnDef {
name: "id".into(),
col_type: ColumnType::Int64,
nullable: false,
..Default::default()
},
ColumnDef {
name: "val".into(),
col_type: ColumnType::ByteArray,
nullable: true,
..Default::default()
},
],
primary_key: vec![0],
..Default::default()
})
}
fn make_file(
path: &str,
level: u8,
key_min: &[u8],
key_max: &[u8],
seq_min: u64,
seq_max: u64,
num_rows: u64,
) -> DataFileMeta {
DataFileMeta {
path: path.to_string(),
meta: ParquetFileMeta {
level: Level(level),
seq_min,
seq_max,
key_min: key_min.to_vec(),
key_max: key_max.to_vec(),
num_rows,
file_size: num_rows * 100,
dv_path: None,
dv_offset: None,
dv_length: None,
format: None,
column_stats: None,
},
dv_path: None,
dv_offset: None,
dv_length: None,
}
}
#[test]
fn empty_version() {
let v = Version::empty(test_schema());
assert_eq!(v.l0_file_count(), 0);
assert_eq!(v.total_files(), 0);
assert_eq!(v.max_level(), Level(0));
}
#[test]
fn l0_file_count() {
let mut v = Version::empty(test_schema());
v.levels.insert(
Level(0),
vec![
make_file("f1.parquet", 0, b"\x01", b"\x05", 1, 10, 100),
make_file("f2.parquet", 0, b"\x03", b"\x08", 11, 20, 200),
],
);
assert_eq!(v.l0_file_count(), 2);
}
#[test]
fn find_file_for_key_l1() {
let mut v = Version::empty(test_schema());
v.levels.insert(
Level(1),
vec![
make_file("a.parquet", 1, b"\x01", b"\x03", 1, 10, 100),
make_file("b.parquet", 1, b"\x05", b"\x08", 1, 10, 100),
make_file("c.parquet", 1, b"\x0A", b"\x0F", 1, 10, 100),
],
);
assert_eq!(
v.find_file_for_key(Level(1), &[0x02]).unwrap().path,
"a.parquet"
);
assert_eq!(
v.find_file_for_key(Level(1), &[0x06]).unwrap().path,
"b.parquet"
);
assert_eq!(
v.find_file_for_key(Level(1), &[0x0C]).unwrap().path,
"c.parquet"
);
assert!(v.find_file_for_key(Level(1), &[0x04]).is_none());
assert!(v.find_file_for_key(Level(1), &[0x00]).is_none());
assert!(v.find_file_for_key(Level(1), &[0xFF]).is_none());
}
#[test]
fn version_set_swap() {
let schema = test_schema();
let v1 = Version::empty(schema.clone());
let vs = VersionSet::new(v1);
assert_eq!(vs.snapshot_id(), 0);
let mut v2 = Version::empty(schema);
v2.snapshot_id = 42;
v2.levels.insert(
Level(0),
vec![make_file("new.parquet", 0, b"\x01", b"\xFF", 1, 50, 500)],
);
vs.install(v2);
assert_eq!(vs.snapshot_id(), 42);
let guard = vs.current();
assert_eq!(guard.l0_file_count(), 1);
}
#[test]
fn level_bytes() {
let mut v = Version::empty(test_schema());
v.levels.insert(
Level(1),
vec![
make_file("a.parquet", 1, b"\x01", b"\x05", 1, 10, 100),
make_file("b.parquet", 1, b"\x06", b"\x0A", 1, 10, 200),
],
);
assert_eq!(v.level_bytes(Level(1)), 100 * 100 + 200 * 100);
assert_eq!(v.level_bytes(Level(2)), 0);
}
}