use std::fmt::{Debug, Formatter};
pub use fusio::path::Path;
#[cfg(feature = "aws")]
pub use fusio::remotes::aws::AwsCredential;
pub use fusio_dispatch::FsOptions;
use parquet::{
basic::Compression,
file::properties::{EnabledStatistics, WriterProperties},
};
use thiserror::Error;
use crate::{
fs::{FileId, FileType},
record::{Record, Schema},
trigger::TriggerType,
version::{Version, MAX_LEVEL},
};
const DEFAULT_WAL_BUFFER_SIZE: usize = 4 * 1024;
#[derive(Clone)]
pub struct DbOption {
pub(crate) clean_channel_buffer: usize,
pub(crate) base_path: Path,
pub(crate) base_fs: FsOptions,
pub(crate) level_paths: Vec<Option<(Path, FsOptions)>>,
pub(crate) immutable_chunk_num: usize,
pub(crate) immutable_chunk_max_num: usize,
pub(crate) level_sst_magnification: usize,
pub(crate) major_default_oldest_table_num: usize,
pub(crate) major_l_selection_table_max_num: usize,
pub(crate) major_threshold_with_sst_size: usize,
pub(crate) max_sst_file_size: usize,
pub(crate) version_log_snapshot_threshold: u32,
pub(crate) trigger_type: TriggerType,
pub(crate) use_wal: bool,
pub(crate) wal_buffer_size: usize,
pub(crate) write_parquet_properties: WriterProperties,
}
impl DbOption {
pub fn new<S: Schema>(base_path: Path, schema: &S) -> Self {
let (column_paths, sorting_columns) = schema.primary_key_path();
DbOption {
immutable_chunk_num: 3,
immutable_chunk_max_num: 5,
major_threshold_with_sst_size: 4,
level_sst_magnification: 10,
max_sst_file_size: 256 * 1024 * 1024,
clean_channel_buffer: 10,
base_path,
write_parquet_properties: WriterProperties::builder()
.set_compression(Compression::LZ4)
.set_column_statistics_enabled(column_paths.clone(), EnabledStatistics::Page)
.set_column_bloom_filter_enabled(column_paths.clone(), true)
.set_sorting_columns(Some(sorting_columns))
.set_created_by(concat!("tonbo version ", env!("CARGO_PKG_VERSION")).to_owned())
.build(),
use_wal: true,
wal_buffer_size: DEFAULT_WAL_BUFFER_SIZE,
major_default_oldest_table_num: 3,
major_l_selection_table_max_num: 4,
trigger_type: TriggerType::SizeOfMem(64 * 1024 * 1024),
version_log_snapshot_threshold: 200,
level_paths: vec![None; MAX_LEVEL],
base_fs: FsOptions::Local,
}
}
}
impl DbOption {
pub fn path(self, path: impl Into<Path>) -> Self {
DbOption {
base_path: path.into(),
..self
}
}
pub fn immutable_chunk_num(self, immutable_chunk_num: usize) -> Self {
DbOption {
immutable_chunk_num,
..self
}
}
pub fn major_threshold_with_sst_size(self, major_threshold_with_sst_size: usize) -> Self {
DbOption {
major_threshold_with_sst_size,
..self
}
}
pub fn level_sst_magnification(self, level_sst_magnification: usize) -> Self {
DbOption {
level_sst_magnification,
..self
}
}
pub fn max_sst_file_size(self, max_sst_file_size: usize) -> Self {
DbOption {
max_sst_file_size,
..self
}
}
pub fn clean_channel_buffer(self, clean_channel_buffer: usize) -> Self {
DbOption {
clean_channel_buffer,
..self
}
}
pub fn write_parquet_option(self, write_parquet_properties: WriterProperties) -> Self {
DbOption {
write_parquet_properties,
..self
}
}
pub fn disable_wal(self) -> Self {
DbOption {
use_wal: false,
..self
}
}
pub fn wal_buffer_size(self, wal_buffer_size: usize) -> Self {
DbOption {
wal_buffer_size,
..self
}
}
pub fn major_default_oldest_table_num(self, major_default_oldest_table_num: usize) -> Self {
DbOption {
major_default_oldest_table_num,
..self
}
}
pub fn version_log_snapshot_threshold(self, version_log_snapshot_threshold: u32) -> Self {
DbOption {
version_log_snapshot_threshold,
..self
}
}
pub fn level_path(
mut self,
level: usize,
path: Path,
fs_options: FsOptions,
) -> Result<Self, ExceedsMaxLevel> {
if level >= MAX_LEVEL {
return Err(ExceedsMaxLevel);
}
self.level_paths[level] = Some((path, fs_options));
Ok(self)
}
pub fn base_fs(mut self, base_fs: FsOptions) -> Self {
self.base_fs = base_fs;
self
}
}
#[derive(Debug, Error)]
#[error("exceeds max level, max level is {}", MAX_LEVEL)]
pub struct ExceedsMaxLevel;
impl DbOption {
pub(crate) fn table_path(&self, gen: FileId, level: usize) -> Path {
self.level_paths[level]
.as_ref()
.map(|(path, _)| path)
.unwrap_or(&self.base_path)
.child(format!("{}.{}", gen, FileType::Parquet))
}
pub(crate) fn wal_dir_path(&self) -> Path {
self.base_path.child("wal")
}
pub(crate) fn wal_path(&self, gen: FileId) -> Path {
self.wal_dir_path()
.child(format!("{}.{}", gen, FileType::Wal))
}
pub(crate) fn version_log_dir_path(&self) -> Path {
self.base_path.child("version")
}
pub(crate) fn version_log_path(&self, gen: FileId) -> Path {
self.version_log_dir_path()
.child(format!("{}.{}", gen, FileType::Log))
}
pub(crate) fn level_fs_path(&self, level: usize) -> Option<&Path> {
self.level_paths[level].as_ref().map(|(path, _)| path)
}
pub(crate) fn is_threshold_exceeded_major<R: Record>(
&self,
version: &Version<R>,
level: usize,
) -> bool {
Version::<R>::tables_len(version, level)
>= (self.major_threshold_with_sst_size * self.level_sst_magnification.pow(level as u32))
}
}
impl Debug for DbOption {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DbOption")
.field("clean_channel_buffer", &self.clean_channel_buffer)
.field("base_path", &self.base_path)
.field("immutable_chunk_num", &self.immutable_chunk_num)
.field("immutable_chunk_max_num", &self.immutable_chunk_max_num)
.field("level_sst_magnification", &self.level_sst_magnification)
.field(
"major_default_oldest_table_num",
&self.major_default_oldest_table_num,
)
.field(
"major_l_selection_table_max_num",
&self.major_l_selection_table_max_num,
)
.field(
"major_threshold_with_sst_size",
&self.major_threshold_with_sst_size,
)
.field("max_sst_file_size", &self.max_sst_file_size)
.field(
"version_log_snapshot_threshold",
&self.version_log_snapshot_threshold,
)
.field("trigger_type", &self.trigger_type)
.field("use_wal", &self.use_wal)
.field("write_parquet_properties", &self.write_parquet_properties)
.finish()
}
}