use std::sync::Arc;
use arrow_schema::SchemaRef;
use uuid::Uuid;
use crate::dataset::Dataset;
use crate::dataset::mem_wal::write::{BatchStore, IndexStore};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct LsmGeneration(u64);
impl LsmGeneration {
pub const BASE_TABLE: Self = Self(0);
pub fn memtable(generation: u64) -> Self {
assert!(
generation > 0,
"MemTable generation must be >= 1 (0 is reserved for base table)"
);
Self(generation)
}
pub fn as_u64(&self) -> u64 {
self.0
}
pub fn is_base_table(&self) -> bool {
self.0 == 0
}
}
impl From<u64> for LsmGeneration {
fn from(value: u64) -> Self {
Self(value)
}
}
impl std::fmt::Display for LsmGeneration {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.is_base_table() {
write!(f, "base")
} else {
write!(f, "gen{}", self.0)
}
}
}
impl Default for LsmGeneration {
fn default() -> Self {
Self::BASE_TABLE
}
}
#[derive(Debug, Clone)]
pub struct FlushedGeneration {
pub generation: u64,
pub path: String,
}
#[derive(Debug, Clone)]
pub struct RegionSnapshot {
pub region_id: Uuid,
pub spec_id: u32,
pub current_generation: u64,
pub flushed_generations: Vec<FlushedGeneration>,
}
impl RegionSnapshot {
pub fn new(region_id: Uuid) -> Self {
Self {
region_id,
spec_id: 0,
current_generation: 1,
flushed_generations: Vec::new(),
}
}
pub fn with_spec_id(mut self, spec_id: u32) -> Self {
self.spec_id = spec_id;
self
}
pub fn with_current_generation(mut self, generation: u64) -> Self {
self.current_generation = generation;
self
}
pub fn with_flushed_generation(mut self, generation: u64, path: String) -> Self {
self.flushed_generations
.push(FlushedGeneration { generation, path });
self
}
}
pub enum LsmDataSource {
BaseTable {
dataset: Arc<Dataset>,
},
FlushedMemTable {
path: String,
region_id: Uuid,
generation: LsmGeneration,
},
ActiveMemTable {
batch_store: Arc<BatchStore>,
index_store: Arc<IndexStore>,
schema: SchemaRef,
region_id: Uuid,
generation: LsmGeneration,
},
}
impl LsmDataSource {
pub fn generation(&self) -> LsmGeneration {
match self {
Self::BaseTable { .. } => LsmGeneration::BASE_TABLE,
Self::FlushedMemTable { generation, .. } => *generation,
Self::ActiveMemTable { generation, .. } => *generation,
}
}
pub fn region_id(&self) -> Option<Uuid> {
match self {
Self::BaseTable { .. } => None,
Self::FlushedMemTable { region_id, .. } => Some(*region_id),
Self::ActiveMemTable { region_id, .. } => Some(*region_id),
}
}
pub fn is_base_table(&self) -> bool {
matches!(self, Self::BaseTable { .. })
}
pub fn is_active_memtable(&self) -> bool {
matches!(self, Self::ActiveMemTable { .. })
}
pub fn display_name(&self) -> String {
match self {
Self::BaseTable { .. } => "base_table".to_string(),
Self::FlushedMemTable {
region_id,
generation,
..
} => format!("flushed[{}:{}]", ®ion_id.to_string()[..8], generation),
Self::ActiveMemTable {
region_id,
generation,
..
} => format!("memtable[{}:{}]", ®ion_id.to_string()[..8], generation),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_lsm_generation_ordering() {
let base = LsmGeneration::BASE_TABLE;
let gen1 = LsmGeneration::memtable(1);
let gen2 = LsmGeneration::memtable(2);
let gen10 = LsmGeneration::memtable(10);
assert!(base < gen1);
assert!(base < gen2);
assert!(base < gen10);
assert!(gen1 < gen2);
assert!(gen2 < gen10);
assert_eq!(base.to_string(), "base");
assert_eq!(gen1.to_string(), "gen1");
assert_eq!(gen10.to_string(), "gen10");
assert_eq!(base.as_u64(), 0);
assert_eq!(gen1.as_u64(), 1);
assert_eq!(gen10.as_u64(), 10);
}
#[test]
fn test_lsm_generation_conversions() {
let from_u64: LsmGeneration = 5u64.into();
assert_eq!(from_u64.as_u64(), 5);
let base: LsmGeneration = 0u64.into();
assert!(base.is_base_table());
}
#[test]
#[should_panic(expected = "MemTable generation must be >= 1")]
fn test_memtable_generation_zero_panics() {
LsmGeneration::memtable(0);
}
#[test]
fn test_region_snapshot_builder() {
let region_id = Uuid::new_v4();
let snapshot = RegionSnapshot::new(region_id)
.with_spec_id(1)
.with_current_generation(5)
.with_flushed_generation(1, "abc123_gen_1".to_string())
.with_flushed_generation(2, "def456_gen_2".to_string());
assert_eq!(snapshot.region_id, region_id);
assert_eq!(snapshot.spec_id, 1);
assert_eq!(snapshot.current_generation, 5);
assert_eq!(snapshot.flushed_generations.len(), 2);
assert_eq!(snapshot.flushed_generations[0].generation, 1);
assert_eq!(snapshot.flushed_generations[1].generation, 2);
}
}