use crate::SstCompressionAlgorithm;
use crate::data_file::DataFileType;
use crate::error::{Error, Result};
use crate::time::TimeProviderKind;
use crate::util::{normalize_storage_path_to_url, size_to_u64, size_to_usize};
use config::{Config as ConfigLoader, File as ConfigFile, FileFormat as ConfigFileFormat};
use log::warn;
use rand::seq::SliceRandom;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use size::Size;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use toml::Value as TomlValue;
use url::Url;
const DEFAULT_READ_PROXY_RELOAD_TOLERANCE_SECONDS: u64 = 10;
fn deserialize_volume_kinds<'de, D>(deserializer: D) -> Result<u8, D::Error>
where
D: serde::Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum KindsInput {
Mask(u8),
MaskString(String),
List(Vec<String>),
}
let input = KindsInput::deserialize(deserializer)?;
let mut mask = 0u8;
let add_kind = |mask: &mut u8, kind: VolumeUsageKind| {
*mask |= kind.mask();
};
let mut parse_values = |values: Vec<String>| -> Result<u8, D::Error> {
for value in values {
let normalized = value.trim().to_lowercase().replace('-', "_");
match normalized.as_str() {
"meta" => add_kind(&mut mask, VolumeUsageKind::Meta),
"primary_data_priority_high" => {
add_kind(&mut mask, VolumeUsageKind::PrimaryDataPriorityHigh);
}
"primary_data_priority_medium" => {
add_kind(&mut mask, VolumeUsageKind::PrimaryDataPriorityMedium);
}
"primary_data_priority_low" => {
add_kind(&mut mask, VolumeUsageKind::PrimaryDataPriorityLow);
}
"snapshot" => add_kind(&mut mask, VolumeUsageKind::Snapshot),
"cache" => add_kind(&mut mask, VolumeUsageKind::Cache),
"readonly" => add_kind(&mut mask, VolumeUsageKind::Readonly),
_ => {
return Err(serde::de::Error::custom(format!(
"Unknown volume usage kind: {}",
value
)));
}
}
}
Ok(mask)
};
match input {
KindsInput::Mask(value) => Ok(value),
KindsInput::MaskString(value) => {
let trimmed = value.trim();
if trimmed.is_empty() {
return Ok(0);
}
if trimmed.contains(',') {
let values = trimmed
.split(',')
.map(|entry| entry.trim().to_string())
.collect();
parse_values(values)
} else if let Ok(parsed) = trimmed.parse::<u8>() {
Ok(parsed)
} else {
parse_values(vec![trimmed.to_string()])
}
}
KindsInput::List(values) => parse_values(values),
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum CompactionPolicyKind {
RoundRobin,
MinOverlap,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, Default, Deserialize, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum PrimaryVolumeOffloadPolicyKind {
LargestFile,
#[default]
Priority,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, Default, Deserialize, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum MemtableType {
#[default]
Hash,
Skiplist,
Vec,
}
#[repr(u8)]
#[derive(Clone, Copy, Debug, Eq, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum VolumeUsageKind {
Meta = 0,
PrimaryDataPriorityHigh = 1,
PrimaryDataPriorityMedium = 2,
PrimaryDataPriorityLow = 3,
Snapshot = 4,
Cache = 5,
Readonly = 6,
}
impl VolumeUsageKind {
fn mask(self) -> u8 {
1 << (self as u8)
}
}
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct VolumeDescriptor {
pub base_dir: String,
pub access_id: Option<String>,
pub secret_key: Option<String>,
pub size_limit: Option<Size>,
pub custom_options: Option<HashMap<String, String>>,
#[serde(deserialize_with = "deserialize_volume_kinds")]
pub kinds: u8,
}
impl VolumeDescriptor {
pub fn new(base_dir: impl Into<String>, kinds: Vec<VolumeUsageKind>) -> Self {
let mut descriptor = Self {
base_dir: base_dir.into(),
access_id: None,
secret_key: None,
size_limit: None,
custom_options: None,
kinds: 0,
};
for kind in kinds {
descriptor.set_usage(kind);
}
descriptor
}
pub fn single_volume(base_dir: impl Into<String>) -> Vec<Self> {
vec![Self::new(
base_dir,
vec![
VolumeUsageKind::PrimaryDataPriorityHigh,
VolumeUsageKind::Meta,
],
)]
}
pub fn set_usage(&mut self, kind: VolumeUsageKind) {
self.kinds |= kind.mask();
}
pub fn supports(&self, kind: VolumeUsageKind) -> bool {
(self.kinds & kind.mask()) != 0
}
pub(crate) fn size_limit_bytes(&self) -> Result<Option<u64>> {
self.size_limit
.map(|size| size_to_u64("volumes[].size_limit", size))
.transpose()
.map_err(Error::ConfigError)
}
}
fn supports_primary_data(volume: &VolumeDescriptor) -> bool {
volume.supports(VolumeUsageKind::PrimaryDataPriorityHigh)
|| volume.supports(VolumeUsageKind::PrimaryDataPriorityMedium)
|| volume.supports(VolumeUsageKind::PrimaryDataPriorityLow)
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct ReaderConfigEntry {
pub pin_partition_in_memory_count: usize,
pub block_cache_size: Size,
pub reload_tolerance_seconds: u64,
}
impl Default for ReaderConfigEntry {
fn default() -> Self {
Self {
pin_partition_in_memory_count: 1,
block_cache_size: Size::from_mib(512),
reload_tolerance_seconds: DEFAULT_READ_PROXY_RELOAD_TOLERANCE_SECONDS,
}
}
}
impl ReaderConfigEntry {
pub(crate) fn block_cache_size_bytes(&self) -> Result<usize> {
size_to_usize("reader.block_cache_size", self.block_cache_size).map_err(Error::ConfigError)
}
}
#[derive(Clone, Debug)]
pub struct ReadOptions {
pub column_indices: Option<Vec<usize>>,
max_index: Option<usize>,
cached_masks: Arc<Mutex<Option<ReadOptionsMasks>>>,
}
#[derive(Clone, Debug, Default)]
pub struct ScanOptions {
pub read_ahead_bytes: Size,
pub column_indices: Option<Vec<usize>>,
max_index: Option<usize>,
}
#[derive(Clone, Debug, Default)]
pub struct WriteOptions {
pub ttl_seconds: Option<u32>,
}
impl WriteOptions {
pub fn with_ttl(ttl_seconds: u32) -> Self {
Self {
ttl_seconds: Some(ttl_seconds),
}
}
}
#[derive(Clone, Debug)]
pub(crate) struct ReadOptionsMasks {
pub(crate) num_columns: usize,
pub(crate) selected_mask: Option<Arc<[u8]>>,
pub(crate) base_mask: Arc<[u8]>,
}
impl Default for ReadOptions {
fn default() -> Self {
Self {
column_indices: None,
max_index: None,
cached_masks: Arc::new(Mutex::new(None)),
}
}
}
impl ScanOptions {
pub fn for_column(column_index: usize) -> Self {
Self::new_with_indices(Some(vec![column_index]))
}
pub fn for_columns(column_indices: Vec<usize>) -> Self {
Self::new_with_indices(Some(column_indices))
}
fn new_with_indices(column_indices: Option<Vec<usize>>) -> Self {
let max_index = column_indices
.as_ref()
.and_then(|indices| indices.iter().max().cloned());
Self {
read_ahead_bytes: Size::from_const(0),
column_indices,
max_index,
}
}
pub(crate) fn columns(&self) -> Option<&[usize]> {
self.column_indices.as_deref()
}
pub(crate) fn max_index(&self) -> Option<usize> {
self.max_index
}
pub(crate) fn read_ahead_bytes(&self) -> Result<usize> {
size_to_usize("scan.read_ahead_bytes", self.read_ahead_bytes).map_err(Error::ConfigError)
}
}
impl ReadOptions {
pub fn for_column(column_index: usize) -> Self {
Self::new_with_indices(Some(vec![column_index]))
}
pub fn for_columns(column_indices: Vec<usize>) -> Self {
Self::new_with_indices(Some(column_indices))
}
fn new_with_indices(column_indices: Option<Vec<usize>>) -> Self {
let max_index = column_indices
.as_ref()
.and_then(|indices| indices.iter().max().cloned());
Self {
column_indices,
max_index,
cached_masks: Arc::new(Mutex::new(None)),
}
}
pub(crate) fn columns(&self) -> Option<&[usize]> {
self.column_indices.as_deref()
}
pub(crate) fn max_index(&self) -> Option<usize> {
self.max_index
}
pub(crate) fn masks(&self, num_columns: usize) -> ReadOptionsMasks {
let mut guard = self.cached_masks.lock().unwrap();
if guard
.as_ref()
.map(|mask| mask.num_columns != num_columns)
.unwrap_or(true)
{
*guard = Some(self.build_masks(num_columns));
}
guard.as_ref().expect("cached mask initialized").clone()
}
fn build_masks(&self, num_columns: usize) -> ReadOptionsMasks {
let mask_size = num_columns.div_ceil(8).max(1);
let last_bits = (num_columns - 1) % 8 + 1;
let last_mask = (1u8 << last_bits) - 1;
let selected_mask = self.column_indices.as_ref().map(|columns| {
let mut mask = vec![0u8; mask_size];
for &column_idx in columns {
if column_idx < num_columns {
mask[column_idx / 8] |= 1 << (column_idx % 8);
}
}
mask[mask_size - 1] &= last_mask;
Arc::from(mask.into_boxed_slice())
});
let base_mask = if let Some(mask) = selected_mask.as_ref() {
Arc::clone(mask)
} else {
let mut mask = vec![0xFF; mask_size];
mask[mask_size - 1] &= last_mask;
Arc::from(mask.into_boxed_slice())
};
ReadOptionsMasks {
num_columns,
selected_mask,
base_mask,
}
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Config {
pub volumes: Vec<VolumeDescriptor>,
pub memtable_capacity: Size,
pub memtable_buffer_count: usize,
pub memtable_type: MemtableType,
pub num_columns: usize,
pub total_buckets: u32,
pub l0_file_limit: usize,
pub write_stall_limit: Option<usize>,
pub l1_base_bytes: Size,
pub level_size_multiplier: usize,
pub max_level: u8,
pub compaction_policy: CompactionPolicyKind,
pub compaction_read_ahead_enabled: bool,
pub compaction_remote_addr: Option<String>,
pub compaction_threads: usize,
pub compaction_remote_timeout_ms: u64,
pub compaction_server_max_concurrent: usize,
pub compaction_server_max_queued: usize,
pub block_cache_size: Size,
pub block_cache_hybrid_enabled: bool,
pub block_cache_hybrid_disk_size: Option<Size>,
pub reader: ReaderConfigEntry,
pub base_file_size: Size,
pub sst_bloom_filter_enabled: bool,
pub sst_bloom_bits_per_key: u32,
pub sst_partitioned_index: bool,
pub data_file_type: DataFileType,
pub parquet_row_group_size_bytes: Size,
pub sst_compression_by_level: Vec<SstCompressionAlgorithm>,
pub ttl_enabled: bool,
pub default_ttl_seconds: Option<u32>,
pub value_separation_threshold: Option<Size>,
pub time_provider: TimeProviderKind,
pub log_path: Option<String>,
pub log_console: bool,
pub log_level: log::LevelFilter,
pub snapshot_on_flush: bool,
pub active_memtable_incremental_snapshot_ratio: f64,
pub lsm_split_trigger_level: Option<u8>,
pub primary_volume_write_stop_watermark: f64,
pub primary_volume_offload_trigger_watermark: f64,
pub primary_volume_offload_policy: PrimaryVolumeOffloadPolicyKind,
pub snapshot_retention: Option<usize>,
}
impl Default for Config {
fn default() -> Self {
Self {
volumes: VolumeDescriptor::single_volume("file:///tmp/cobble"),
memtable_capacity: Size::from_mib(64),
memtable_buffer_count: 2,
memtable_type: MemtableType::Hash,
num_columns: 1,
total_buckets: 1,
l0_file_limit: 4,
write_stall_limit: None,
l1_base_bytes: Size::from_mib(64),
level_size_multiplier: 10,
max_level: 6,
compaction_policy: CompactionPolicyKind::RoundRobin,
compaction_read_ahead_enabled: true,
compaction_remote_addr: None,
compaction_threads: 4,
compaction_remote_timeout_ms: 300_000,
compaction_server_max_concurrent: 4,
compaction_server_max_queued: 64,
block_cache_size: Size::from_mib(64),
block_cache_hybrid_enabled: false,
block_cache_hybrid_disk_size: None,
reader: ReaderConfigEntry::default(),
base_file_size: Size::from_mib(64),
sst_bloom_filter_enabled: false,
sst_bloom_bits_per_key: 10,
sst_partitioned_index: false,
data_file_type: DataFileType::SSTable,
parquet_row_group_size_bytes: Size::from_kib(256),
sst_compression_by_level: vec![
SstCompressionAlgorithm::None,
SstCompressionAlgorithm::None,
SstCompressionAlgorithm::Lz4,
],
ttl_enabled: false,
default_ttl_seconds: None,
value_separation_threshold: None,
time_provider: TimeProviderKind::default(),
log_path: None,
log_console: false,
log_level: log::LevelFilter::Info,
snapshot_on_flush: false,
active_memtable_incremental_snapshot_ratio: 0.0,
lsm_split_trigger_level: None,
primary_volume_write_stop_watermark: 0.95,
primary_volume_offload_trigger_watermark: 0.85,
primary_volume_offload_policy: PrimaryVolumeOffloadPolicyKind::Priority,
snapshot_retention: None,
}
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub(crate) struct HybridCacheVolumePlan {
pub(crate) volume_idx: usize,
pub(crate) base_dir: String,
pub(crate) disk_capacity_bytes: usize,
pub(crate) shared_with_primary: bool,
}
impl Config {
pub(crate) fn normalize_volume_paths(&self) -> Result<Self> {
let mut copied = self.clone();
for volume in &mut copied.volumes {
volume.base_dir = normalize_storage_path_to_url(&volume.base_dir)?;
}
Ok(copied)
}
}
impl Config {
pub fn from_json_str(contents: &str) -> Result<Self> {
let provided = serde_json::from_str::<JsonValue>(contents)
.map_err(|err| Error::ConfigError(err.to_string()))?;
let schema = serde_json::to_value(Config::default())
.map_err(|err| Error::ConfigError(err.to_string()))?;
let unrecognized = collect_unrecognized_entry_paths(&provided, &schema, "");
for entry in unrecognized {
warn!("unrecognized entry: {}", entry);
}
let default_json = serde_json::to_string(&Config::default())
.map_err(|err| Error::ConfigError(err.to_string()))?;
let provided_json =
serde_json::to_string(&provided).map_err(|err| Error::ConfigError(err.to_string()))?;
let mut builder = ConfigLoader::builder();
builder = builder.add_source(ConfigFile::from_str(&default_json, ConfigFileFormat::Json));
builder = builder.add_source(ConfigFile::from_str(&provided_json, ConfigFileFormat::Json));
let config: Config = builder
.build()
.map_err(|err| Error::ConfigError(err.to_string()))?
.try_deserialize()
.map_err(|err| Error::ConfigError(err.to_string()))?;
config.validate_sizes()?;
Ok(config)
}
pub(crate) fn memtable_capacity_bytes(&self) -> Result<usize> {
size_to_usize("memtable_capacity", self.memtable_capacity).map_err(Error::ConfigError)
}
pub(crate) fn l1_base_bytes_bytes(&self) -> Result<usize> {
size_to_usize("l1_base_bytes", self.l1_base_bytes).map_err(Error::ConfigError)
}
pub(crate) fn block_cache_size_bytes(&self) -> Result<usize> {
size_to_usize("block_cache_size", self.block_cache_size).map_err(Error::ConfigError)
}
pub(crate) fn block_cache_hybrid_disk_size_bytes(&self) -> Result<Option<usize>> {
self.block_cache_hybrid_disk_size
.map(|size| size_to_usize("block_cache_hybrid_disk_size", size))
.transpose()
.map_err(Error::ConfigError)
}
pub(crate) fn base_file_size_bytes(&self) -> Result<usize> {
size_to_usize("base_file_size", self.base_file_size).map_err(Error::ConfigError)
}
pub(crate) fn parquet_row_group_size_bytes(&self) -> Result<usize> {
size_to_usize(
"parquet_row_group_size_bytes",
self.parquet_row_group_size_bytes,
)
.map_err(Error::ConfigError)
}
pub(crate) fn value_separation_threshold_bytes(&self) -> Result<usize> {
self.value_separation_threshold
.map(|size| size_to_usize("value_separation_threshold", size))
.transpose()
.map_err(Error::ConfigError)
.map(|size| size.unwrap_or(0))
}
pub(crate) fn hybrid_block_cache_disk_size(
&self,
memory_capacity: usize,
) -> Result<Option<usize>> {
if !self.block_cache_hybrid_enabled || memory_capacity == 0 {
return Ok(None);
}
let disk = self
.block_cache_hybrid_disk_size
.map(|size| size_to_usize("block_cache_hybrid_disk_size", size))
.transpose()
.map_err(Error::ConfigError)?
.unwrap_or(memory_capacity);
Ok(Some(disk))
}
pub(crate) fn resolve_hybrid_cache_volume_plan(
&self,
memory_capacity: usize,
) -> Result<Option<HybridCacheVolumePlan>> {
let Some(disk_capacity_bytes) = self.hybrid_block_cache_disk_size(memory_capacity)? else {
return Ok(None);
};
if disk_capacity_bytes == 0 {
return Err(Error::ConfigError(
"block_cache_hybrid_disk_size must be greater than 0 when hybrid cache is enabled"
.to_string(),
));
}
let required = disk_capacity_bytes as u64;
let mut cache_only_candidates: Vec<HybridCacheVolumePlan> = Vec::new();
let mut shared_candidates: Vec<HybridCacheVolumePlan> = Vec::new();
let mut has_cache_volume = false;
let mut has_local_cache_volume = false;
for (idx, volume) in self.volumes.iter().enumerate() {
if !volume.supports(VolumeUsageKind::Cache) {
continue;
}
has_cache_volume = true;
let normalized_base_dir = normalize_storage_path_to_url(&volume.base_dir)?;
let url = Url::parse(&normalized_base_dir).map_err(|err| {
Error::ConfigError(format!(
"Invalid cache volume URL {}: {}",
normalized_base_dir, err
))
})?;
if !url.scheme().eq_ignore_ascii_case("file") {
continue;
}
has_local_cache_volume = true;
let volume_limit = volume
.size_limit
.map(|limit| size_to_u64(&format!("volumes[{idx}].size_limit"), limit))
.transpose()
.map_err(Error::ConfigError)?;
let fits = match volume_limit {
Some(limit) => limit >= required,
None => true,
};
if !fits {
continue;
}
let shared_with_primary = supports_primary_data(volume);
let plan = HybridCacheVolumePlan {
volume_idx: idx,
base_dir: normalized_base_dir,
disk_capacity_bytes,
shared_with_primary,
};
if !shared_with_primary {
cache_only_candidates.push(plan);
} else {
shared_candidates.push(plan);
}
}
let mut rng = rand::thread_rng();
if let Some(plan) = cache_only_candidates.choose(&mut rng) {
return Ok(Some(plan.clone()));
}
if let Some(plan) = shared_candidates.choose(&mut rng) {
return Ok(Some(plan.clone()));
}
if !has_cache_volume {
return Err(Error::ConfigError(
"Hybrid block cache enabled but no volume is configured with cache usage"
.to_string(),
));
}
if !has_local_cache_volume {
return Err(Error::ConfigError(
"Hybrid block cache requires a local file:// cache volume".to_string(),
));
}
Err(Error::ConfigError(format!(
"No cache volume has enough capacity for hybrid block cache disk size {} bytes",
disk_capacity_bytes
)))
}
pub(crate) fn apply_hybrid_cache_primary_partition_with_plan(
&self,
plan: Option<&HybridCacheVolumePlan>,
) -> Result<Self> {
let Some(plan) = plan else {
return Ok(self.clone());
};
if !plan.shared_with_primary {
return Ok(self.clone());
}
let mut adjusted = self.clone();
let disk_bytes = plan.disk_capacity_bytes as u64;
let volume = adjusted.volumes.get_mut(plan.volume_idx).ok_or_else(|| {
Error::ConfigError(format!(
"Selected hybrid cache volume index {} out of range",
plan.volume_idx
))
})?;
if let Some(limit) = volume.size_limit {
let limit = size_to_u64(&format!("volumes[{}].size_limit", plan.volume_idx), limit)
.map_err(Error::ConfigError)?;
if limit <= disk_bytes {
return Err(Error::ConfigError(format!(
"Hybrid cache reservation {} bytes exceeds shared volume limit {} bytes for {}",
disk_bytes, limit, volume.base_dir
)));
}
volume.size_limit = Some(Size::from_const((limit - disk_bytes) as i64));
}
Ok(adjusted)
}
pub fn from_path(path: impl AsRef<std::path::Path>) -> Result<Self> {
let path = path.as_ref();
let extension = path
.extension()
.and_then(|ext| ext.to_str())
.map(|ext| ext.to_lowercase())
.ok_or_else(|| Error::ConfigError("Config path missing extension".to_string()))?;
let format = match extension.as_str() {
"yaml" | "yml" => ConfigFileFormat::Yaml,
"ini" => ConfigFileFormat::Ini,
"json" => ConfigFileFormat::Json,
"toml" => ConfigFileFormat::Toml,
_ => {
return Err(Error::ConfigError(format!(
"Unsupported config format: {}",
extension
)));
}
};
let provided = match extension.as_str() {
"json" => {
let contents = std::fs::read_to_string(path)
.map_err(|err| Error::ConfigError(err.to_string()))?;
let parsed = serde_json::from_str::<JsonValue>(&contents)
.map_err(|err| Error::ConfigError(err.to_string()))?;
Some(parsed)
}
"yaml" | "yml" => {
let contents = std::fs::read_to_string(path)
.map_err(|err| Error::ConfigError(err.to_string()))?;
let parsed = serde_yaml::from_str::<serde_yaml::Value>(&contents)
.map_err(|err| Error::ConfigError(err.to_string()))?;
Some(
serde_json::to_value(parsed)
.map_err(|err| Error::ConfigError(err.to_string()))?,
)
}
"toml" => {
let contents = std::fs::read_to_string(path)
.map_err(|err| Error::ConfigError(err.to_string()))?;
let parsed = toml::from_str::<TomlValue>(&contents)
.map_err(|err| Error::ConfigError(err.to_string()))?;
Some(
serde_json::to_value(parsed)
.map_err(|err| Error::ConfigError(err.to_string()))?,
)
}
_ => None,
};
let schema = serde_json::to_value(Config::default())
.map_err(|err| Error::ConfigError(err.to_string()))?;
if let Some(provided) = provided.as_ref() {
let unrecognized = collect_unrecognized_entry_paths(provided, &schema, "");
for entry in unrecognized {
warn!("unrecognized entry: {}", entry);
}
}
let default_json = serde_json::to_string(&Config::default())
.map_err(|err| Error::ConfigError(err.to_string()))?;
let mut builder = ConfigLoader::builder();
builder = builder.add_source(ConfigFile::from_str(&default_json, ConfigFileFormat::Json));
builder = builder.add_source(ConfigFile::from(path).format(format));
let config: Config = builder
.build()
.map_err(|err| Error::ConfigError(err.to_string()))?
.try_deserialize()
.map_err(|err| Error::ConfigError(err.to_string()))?;
config.validate_sizes()?;
Ok(config)
}
pub(crate) fn resolved_write_stall_limit(&self) -> usize {
let default_limit = self
.l0_file_limit
.saturating_add(4)
.min(self.l0_file_limit.saturating_mul(2));
match self.write_stall_limit {
Some(limit) => {
if limit > self.l0_file_limit.saturating_add(1) {
limit
} else {
warn!(
"write stall limit {} invalid for l0 limit {}; using default as {}",
limit, self.l0_file_limit, default_limit
);
default_limit
}
}
_ => default_limit,
}
}
pub(crate) fn sst_compression_for_level(&self, level: u8) -> SstCompressionAlgorithm {
if self.sst_compression_by_level.is_empty() {
return if level >= 2 {
SstCompressionAlgorithm::Lz4
} else {
SstCompressionAlgorithm::None
};
}
let idx = level as usize;
if idx < self.sst_compression_by_level.len() {
self.sst_compression_by_level[idx]
} else {
*self
.sst_compression_by_level
.last()
.expect("compression config not empty")
}
}
fn validate_sizes(&self) -> Result<()> {
self.memtable_capacity_bytes()?;
self.l1_base_bytes_bytes()?;
self.block_cache_size_bytes()?;
self.block_cache_hybrid_disk_size_bytes()?;
self.reader.block_cache_size_bytes()?;
self.base_file_size_bytes()?;
self.parquet_row_group_size_bytes()?;
self.value_separation_threshold_bytes()?;
for (idx, volume) in self.volumes.iter().enumerate() {
if let Some(limit) = volume.size_limit {
size_to_u64(&format!("volumes[{idx}].size_limit"), limit)
.map_err(Error::ConfigError)?;
}
}
Ok(())
}
}
fn collect_unrecognized_entry_paths(
provided: &JsonValue,
schema: &JsonValue,
path: &str,
) -> Vec<String> {
match (provided, schema) {
(JsonValue::Object(provided_map), JsonValue::Object(schema_map)) => {
let mut unknown = Vec::new();
for (key, value) in provided_map {
let current_path = if path.is_empty() {
key.clone()
} else {
format!("{}.{}", path, key)
};
if let Some(schema_value) = schema_map.get(key) {
unknown.extend(collect_unrecognized_entry_paths(
value,
schema_value,
¤t_path,
));
} else {
unknown.push(current_path);
}
}
unknown
}
(JsonValue::Array(provided_items), JsonValue::Array(schema_items)) => {
let mut unknown = Vec::new();
if let Some(schema_item) = schema_items.first() {
for (idx, provided_item) in provided_items.iter().enumerate() {
let current_path = format!("{}[{}]", path, idx);
unknown.extend(collect_unrecognized_entry_paths(
provided_item,
schema_item,
¤t_path,
));
}
}
unknown
}
_ => Vec::new(),
}
}
#[cfg(test)]
mod tests {
use super::{
Config, Error, MemtableType, PrimaryVolumeOffloadPolicyKind, ReaderConfigEntry,
VolumeDescriptor, VolumeUsageKind,
};
use crate::SstCompressionAlgorithm;
use crate::data_file::DataFileType;
use size::Size;
use std::io::Write;
use std::path::PathBuf;
use tempfile::Builder;
#[test]
fn test_config_from_file_round_trip() {
let mut volume = VolumeDescriptor::new(
"file:///tmp/cobble".to_string(),
vec![
VolumeUsageKind::PrimaryDataPriorityHigh,
VolumeUsageKind::Meta,
],
);
volume.custom_options = Some(
[
("endpoint".to_string(), "http://127.0.0.1:9000".to_string()),
("region".to_string(), "us-east-1".to_string()),
]
.into_iter()
.collect(),
);
let config = Config {
volumes: vec![volume],
memtable_capacity: Size::from_kib(1),
memtable_buffer_count: 3,
memtable_type: MemtableType::Vec,
num_columns: 2,
total_buckets: 1024,
l0_file_limit: 5,
write_stall_limit: Some(12),
l1_base_bytes: Size::from_kib(8),
level_size_multiplier: 7,
max_level: 4,
compaction_policy: super::CompactionPolicyKind::MinOverlap,
block_cache_size: Size::from_const(256),
block_cache_hybrid_enabled: true,
block_cache_hybrid_disk_size: Some(Size::from_kib(1)),
reader: ReaderConfigEntry {
pin_partition_in_memory_count: 2,
block_cache_size: Size::from_kib(2),
reload_tolerance_seconds: 5,
},
base_file_size: Size::from_const(512),
sst_bloom_filter_enabled: true,
sst_bloom_bits_per_key: 11,
sst_partitioned_index: true,
data_file_type: DataFileType::Parquet,
parquet_row_group_size_bytes: Size::from_kib(4),
sst_compression_by_level: vec![
SstCompressionAlgorithm::None,
SstCompressionAlgorithm::None,
SstCompressionAlgorithm::Lz4,
],
ttl_enabled: true,
default_ttl_seconds: Some(120),
value_separation_threshold: Some(Size::from_kib(4)),
time_provider: crate::time::TimeProviderKind::Manual,
log_path: Some("/tmp/cobble.log".to_string()),
log_console: true,
log_level: log::LevelFilter::Debug,
snapshot_on_flush: true,
active_memtable_incremental_snapshot_ratio: 0.5,
lsm_split_trigger_level: Some(2),
primary_volume_write_stop_watermark: 0.93,
primary_volume_offload_trigger_watermark: 0.82,
primary_volume_offload_policy: PrimaryVolumeOffloadPolicyKind::LargestFile,
snapshot_retention: Some(3),
compaction_read_ahead_enabled: false,
compaction_remote_addr: Some("127.0.0.1:9999".to_string()),
compaction_threads: 6,
compaction_remote_timeout_ms: 120_000,
compaction_server_max_concurrent: 8,
compaction_server_max_queued: 32,
};
let serialized = serde_json::to_string(&config).expect("Cannot serialize config");
let mut json_file = Builder::new()
.suffix(".json")
.tempfile()
.expect("Should create temp json");
json_file
.write_all(serialized.as_bytes())
.expect("Should able to write json");
json_file.flush().expect("Should able to flush json");
let decoded: Config = Config::from_path(json_file.path()).expect("Cannot deserialize json");
assert_eq!(decoded.volumes.len(), 1);
assert!(decoded.volumes[0].supports(VolumeUsageKind::PrimaryDataPriorityHigh));
assert!(decoded.volumes[0].supports(VolumeUsageKind::Meta));
assert_eq!(
decoded.volumes[0]
.custom_options
.as_ref()
.and_then(|v| v.get("endpoint")),
Some(&"http://127.0.0.1:9000".to_string())
);
assert_eq!(decoded.memtable_capacity, Size::from_kib(1));
assert_eq!(decoded.memtable_type, MemtableType::Vec);
assert_eq!(decoded.total_buckets, 1024);
assert_eq!(decoded.write_stall_limit, Some(12));
assert_eq!(
decoded.compaction_policy,
super::CompactionPolicyKind::MinOverlap
);
assert!(decoded.sst_partitioned_index);
assert_eq!(decoded.time_provider, crate::time::TimeProviderKind::Manual);
assert_eq!(decoded.log_level, log::LevelFilter::Debug);
assert_eq!(decoded.snapshot_retention, Some(3));
assert_eq!(decoded.active_memtable_incremental_snapshot_ratio, 0.5);
assert_eq!(decoded.lsm_split_trigger_level, Some(2));
assert_eq!(decoded.primary_volume_write_stop_watermark, 0.93);
assert_eq!(decoded.primary_volume_offload_trigger_watermark, 0.82);
assert_eq!(
decoded.primary_volume_offload_policy,
PrimaryVolumeOffloadPolicyKind::LargestFile
);
assert_eq!(decoded.value_separation_threshold, Some(Size::from_kib(4)));
assert_eq!(decoded.compaction_server_max_concurrent, 8);
assert_eq!(decoded.compaction_server_max_queued, 32);
assert_eq!(decoded.data_file_type, DataFileType::Parquet);
assert_eq!(decoded.parquet_row_group_size_bytes, Size::from_kib(4));
assert_eq!(decoded.reader.block_cache_size, Size::from_kib(2));
assert_eq!(decoded.reader.reload_tolerance_seconds, 5);
assert!(decoded.block_cache_hybrid_enabled);
assert_eq!(
decoded.block_cache_hybrid_disk_size,
Some(Size::from_kib(1))
);
let yaml = serde_yaml::to_string(&config).expect("Cannot serialize yaml");
let mut yaml_file = Builder::new()
.suffix(".yaml")
.tempfile()
.expect("Should create temp yaml");
yaml_file
.write_all(yaml.as_bytes())
.expect("Should able to write yaml");
yaml_file.flush().expect("Should able to flush yaml");
let decoded_yaml: Config =
Config::from_path(yaml_file.path()).expect("Cannot deserialize yaml");
assert_eq!(decoded_yaml.reader.block_cache_size, Size::from_kib(2));
let mut path_buf = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path_buf.push("tests/testdata/config.ini");
let decoded_ini = Config::from_path(path_buf.as_path()).expect("Cannot deserialize ini");
assert_eq!(decoded_ini.memtable_capacity, Size::from_kib(1));
assert_eq!(decoded_ini.reader.block_cache_size, Size::from_kib(2));
assert_eq!(decoded_ini.data_file_type, DataFileType::SSTable);
}
#[test]
fn test_volume_descriptor_kinds_list() {
let json = r#"{
"base_dir": "file:///tmp/cobble",
"kinds": ["meta", "primary_data_priority_high", "snapshot", "cache"]
}"#;
let volume: VolumeDescriptor =
serde_json::from_str(json).expect("Cannot deserialize volume descriptor");
assert!(volume.supports(VolumeUsageKind::Meta));
assert!(volume.supports(VolumeUsageKind::PrimaryDataPriorityHigh));
assert!(volume.supports(VolumeUsageKind::Snapshot));
assert!(volume.supports(VolumeUsageKind::Cache));
}
#[test]
fn test_volume_descriptor_kinds_readonly() {
let json = r#"{
"base_dir": "file:///tmp/cobble-readonly",
"kinds": ["readonly"]
}"#;
let volume: VolumeDescriptor =
serde_json::from_str(json).expect("Cannot deserialize readonly volume descriptor");
assert!(volume.supports(VolumeUsageKind::Readonly));
assert!(!volume.supports(VolumeUsageKind::Snapshot));
assert!(!volume.supports(VolumeUsageKind::PrimaryDataPriorityHigh));
}
#[test]
fn test_normalize_volume_paths_converts_local_absolute_path() {
let mut config = Config::default();
let local = std::env::temp_dir().join("cobble-config-normalize");
config.volumes = VolumeDescriptor::single_volume(local.to_string_lossy().to_string());
let config = config.normalize_volume_paths().unwrap();
assert!(config.volumes[0].base_dir.starts_with("file://"));
}
#[test]
fn test_hybrid_cache_prefers_cache_only_volume() {
let mut config = Config::default();
config.block_cache_hybrid_enabled = true;
config.block_cache_hybrid_disk_size = Some(Size::from_kib(1));
config.volumes = vec![
VolumeDescriptor::new(
"file:///tmp/primary-shared".to_string(),
vec![
VolumeUsageKind::PrimaryDataPriorityHigh,
VolumeUsageKind::Cache,
VolumeUsageKind::Meta,
],
),
VolumeDescriptor::new(
"file:///tmp/cache-only".to_string(),
vec![VolumeUsageKind::Cache],
),
];
let plan = config
.resolve_hybrid_cache_volume_plan(2048)
.unwrap()
.unwrap();
assert_eq!(plan.volume_idx, 1);
assert!(!plan.shared_with_primary);
assert_eq!(plan.disk_capacity_bytes, 1024);
}
#[test]
fn test_hybrid_cache_partitions_shared_volume_limit() {
let mut config = Config::default();
config.block_cache_hybrid_enabled = true;
config.block_cache_hybrid_disk_size = Some(Size::from_kib(1));
let mut shared = VolumeDescriptor::new(
"file:///tmp/shared".to_string(),
vec![
VolumeUsageKind::PrimaryDataPriorityHigh,
VolumeUsageKind::Cache,
VolumeUsageKind::Meta,
],
);
shared.size_limit = Some(Size::from_kib(8));
config.volumes = vec![shared];
let plan = config.resolve_hybrid_cache_volume_plan(4096).unwrap();
let adjusted = config
.apply_hybrid_cache_primary_partition_with_plan(plan.as_ref())
.unwrap();
assert_eq!(adjusted.volumes[0].size_limit, Some(Size::from_kib(7)));
}
#[test]
fn test_hybrid_cache_rejects_non_local_cache_volume() {
let mut config = Config::default();
config.block_cache_hybrid_enabled = true;
config.block_cache_hybrid_disk_size = Some(Size::from_kib(1));
config.volumes = vec![VolumeDescriptor::new(
"s3://bucket/cache".to_string(),
vec![VolumeUsageKind::Cache],
)];
let err = config.resolve_hybrid_cache_volume_plan(2048).unwrap_err();
assert!(matches!(err, Error::ConfigError(_)));
}
#[test]
fn test_data_file_type_missing_field_is_rejected() {
let json = r#"{
"volumes": [{"base_dir":"file:///tmp/cobble","kinds":["meta","primary_data_priority_high"]}],
"num_columns": 1
}"#;
let err = serde_json::from_str::<Config>(json).unwrap_err();
assert!(err.to_string().contains("missing field"));
}
#[test]
fn test_data_file_type_parquet_round_trip() {
let mut expected = Config::default();
expected.data_file_type = DataFileType::Parquet;
expected.parquet_row_group_size_bytes = Size::from_kib(8);
let json = serde_json::to_string(&expected).expect("Cannot serialize config");
let decoded: Config =
serde_json::from_str(&json).expect("Cannot deserialize parquet config");
assert_eq!(decoded.data_file_type, DataFileType::Parquet);
assert_eq!(decoded.parquet_row_group_size_bytes, Size::from_kib(8));
}
#[test]
fn test_config_from_path_allows_partial_entries() {
let json = r#"{
"volumes": [{"base_dir":"file:///tmp/cobble","kinds":["meta","primary_data_priority_high"]}],
"memtable_capacity": 2048
}"#;
let mut json_file = Builder::new()
.suffix(".json")
.tempfile()
.expect("Should create temp json");
json_file
.write_all(json.as_bytes())
.expect("Should be able to write json");
json_file.flush().expect("Should be able to flush json");
let decoded = Config::from_path(json_file.path()).expect("Cannot deserialize partial json");
assert_eq!(decoded.memtable_capacity, Size::from_kib(2));
assert_eq!(decoded.num_columns, Config::default().num_columns);
assert_eq!(decoded.data_file_type, Config::default().data_file_type);
}
#[test]
fn test_config_from_json_str_allows_partial_entries() {
let json = r#"{
"volumes": [{"base_dir":"file:///tmp/cobble","kinds":["meta","primary_data_priority_high"]}],
"memtable_capacity": 2048
}"#;
let decoded = Config::from_json_str(json).expect("Cannot deserialize partial json");
assert_eq!(decoded.memtable_capacity, Size::from_kib(2));
assert_eq!(decoded.num_columns, Config::default().num_columns);
assert_eq!(decoded.data_file_type, Config::default().data_file_type);
}
#[test]
fn test_config_from_path_parses_human_readable_sizes() {
let yaml = r#"
volumes:
- base_dir: "file:///tmp/cobble"
kinds: ["meta", "primary_data_priority_high"]
size_limit: "2GiB"
memtable_capacity: "64MB"
l1_base_bytes: "128MiB"
block_cache_size: "32MB"
block_cache_hybrid_disk_size: "1GiB"
reader:
pin_partition_in_memory_count: 1
block_cache_size: "512MB"
reload_tolerance_seconds: 10
base_file_size: "64MiB"
parquet_row_group_size_bytes: "256KB"
value_separation_threshold: "4MB"
"#;
let mut file = Builder::new()
.suffix(".yaml")
.tempfile()
.expect("should create temp yaml");
file.write_all(yaml.as_bytes())
.expect("should write temp yaml");
file.flush().expect("should flush temp yaml");
let decoded = Config::from_path(file.path()).expect("should parse human-readable sizes");
assert_eq!(decoded.memtable_capacity, Size::from_const(64_000_000));
assert_eq!(decoded.l1_base_bytes, Size::from_mib(128));
assert_eq!(decoded.block_cache_size, Size::from_const(32_000_000));
assert_eq!(
decoded.block_cache_hybrid_disk_size,
Some(Size::from_gib(1))
);
assert_eq!(
decoded.reader.block_cache_size,
Size::from_const(512_000_000)
);
assert_eq!(decoded.base_file_size, Size::from_mib(64));
assert_eq!(
decoded.parquet_row_group_size_bytes,
Size::from_const(256_000)
);
assert_eq!(
decoded.value_separation_threshold,
Some(Size::from_const(4_000_000))
);
assert_eq!(decoded.volumes[0].size_limit, Some(Size::from_gib(2)));
}
#[test]
fn test_config_from_path_rejects_invalid_size_unit() {
let yaml = r#"
volumes:
- base_dir: "file:///tmp/cobble"
kinds: ["meta", "primary_data_priority_high"]
memtable_capacity: "64MEGA"
"#;
let mut file = Builder::new()
.suffix(".yaml")
.tempfile()
.expect("should create temp yaml");
file.write_all(yaml.as_bytes())
.expect("should write temp yaml");
file.flush().expect("should flush temp yaml");
let err = Config::from_path(file.path()).expect_err("invalid unit should be rejected");
assert!(matches!(err, Error::ConfigError(_)));
}
#[test]
fn test_collect_unrecognized_entry_paths() {
let provided = serde_json::json!({
"num_columns": 1,
"unknown_top": 1,
"reader": {
"block_cache_size": 1024,
"unknown_nested": true
},
"volumes": [{
"base_dir": "file:///tmp/cobble",
"kinds": ["meta", "primary_data_priority_high"],
"unknown_volume_key": "x"
}]
});
let schema = serde_json::to_value(Config::default()).expect("serialize default config");
let unknown = super::collect_unrecognized_entry_paths(&provided, &schema, "");
assert!(unknown.contains(&"unknown_top".to_string()));
assert!(unknown.contains(&"reader.unknown_nested".to_string()));
assert!(unknown.contains(&"volumes[0].unknown_volume_key".to_string()));
}
#[test]
fn test_template_config_yaml_is_valid() {
let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../template/config.yaml");
let parsed = Config::from_path(path).expect("template/config.yaml should be valid");
assert_eq!(parsed.total_buckets, 1);
assert_eq!(parsed.memtable_type, MemtableType::Hash);
assert_eq!(parsed.data_file_type, DataFileType::SSTable);
assert_eq!(
parsed.primary_volume_offload_policy,
PrimaryVolumeOffloadPolicyKind::Priority
);
assert_eq!(parsed.volumes.len(), 1);
assert!(parsed.volumes[0].supports(VolumeUsageKind::PrimaryDataPriorityHigh));
assert!(parsed.volumes[0].supports(VolumeUsageKind::Meta));
}
}