use std::{
env, io,
sync::{Arc, atomic::AtomicU64},
time::Duration,
};
use arrow_schema::SchemaRef;
use fusio::{
DynFs, Fs,
dynamic::{MaybeSend, MaybeSync},
executor::{Executor, Timer},
fs::FsCas as FusioCas,
mem::fs::InMemoryFs,
path::{Path, PathPart},
};
#[cfg(feature = "tokio")]
use fusio::{disk::LocalFs, executor::tokio::TokioExecutor};
use fusio_manifest::{CheckpointStoreImpl, HeadStoreImpl, LeaseStoreImpl, SegmentStoreImpl};
use thiserror::Error;
use super::{DB, DbInner, MinorCompactionState};
use crate::{
compaction::{
CompactionWorkerConfig, MinorCompactor, executor::LocalCompactionExecutor,
metrics::CompactionMetrics, planner::CompactionStrategy,
},
extractor::{KeyExtractError, KeyProjection, projection_for_columns},
id::FileIdGenerator,
manifest::{
ManifestError, ManifestFs, TableMeta, TonboManifest, VersionState,
bootstrap::{ensure_manifest_dirs, init_fs_manifest},
},
mode::{DynModeConfig, table_definition},
ondisk::sstable::SsTableConfig,
transaction::CommitAckMode,
wal::{
WalConfig as RuntimeWalConfig, WalError, WalExt, WalRecoveryMode, WalSyncPolicy,
state::{FsWalStateStore, WalStateStore},
storage::WalStorage,
},
};
#[derive(Clone, Default)]
pub struct WalConfig {
segment_max_bytes: Option<usize>,
segment_max_age: Option<Option<Duration>>,
flush_interval: Option<Duration>,
sync: Option<WalSyncPolicy>,
recovery: Option<WalRecoveryMode>,
retention_bytes: Option<Option<usize>>,
queue_size: Option<usize>,
wal_dir: Option<Path>,
segment_backend: Option<Arc<dyn DynFs>>,
state_store: Option<Option<Arc<dyn WalStateStore>>>,
}
impl WalConfig {
#[must_use]
pub fn segment_max_bytes(mut self, bytes: usize) -> Self {
self.segment_max_bytes = Some(bytes);
self
}
#[must_use]
pub fn segment_max_age(mut self, age: Option<Duration>) -> Self {
self.segment_max_age = Some(age);
self
}
#[must_use]
pub fn flush_interval(mut self, interval: Duration) -> Self {
self.flush_interval = Some(interval);
self
}
#[must_use]
pub fn sync_policy(mut self, policy: WalSyncPolicy) -> Self {
self.sync = Some(policy);
self
}
#[must_use]
pub fn recovery_mode(mut self, mode: WalRecoveryMode) -> Self {
self.recovery = Some(mode);
self
}
#[must_use]
pub fn retention_bytes(mut self, retention: Option<usize>) -> Self {
self.retention_bytes = Some(retention);
self
}
#[must_use]
pub fn queue_size(mut self, size: usize) -> Self {
self.queue_size = Some(size);
self
}
#[must_use]
pub fn wal_dir(mut self, dir: Path) -> Self {
self.wal_dir = Some(dir);
self
}
#[must_use]
pub fn segment_backend(mut self, backend: Arc<dyn DynFs>) -> Self {
self.segment_backend = Some(backend);
self
}
#[must_use]
pub fn state_store(mut self, store: Option<Arc<dyn WalStateStore>>) -> Self {
self.state_store = Some(store);
self
}
fn apply(&self, cfg: &mut RuntimeWalConfig) {
if let Some(bytes) = self.segment_max_bytes {
cfg.segment_max_bytes = bytes;
}
if let Some(age) = self.segment_max_age {
cfg.segment_max_age = age;
}
if let Some(interval) = self.flush_interval {
cfg.flush_interval = interval;
}
if let Some(policy) = self.sync.clone() {
cfg.sync = policy;
}
if let Some(mode) = self.recovery {
cfg.recovery = mode;
}
if let Some(retention) = self.retention_bytes {
cfg.retention_bytes = retention;
}
if let Some(size) = self.queue_size {
cfg.queue_size = size;
}
if let Some(dir) = self.wal_dir.clone() {
cfg.dir = dir;
}
if let Some(backend) = &self.segment_backend {
cfg.segment_backend = Arc::clone(backend);
}
if let Some(store) = &self.state_store {
cfg.state_store = store.clone();
}
}
fn merge(&mut self, other: Self) {
if other.segment_max_bytes.is_some() {
self.segment_max_bytes = other.segment_max_bytes;
}
if other.segment_max_age.is_some() {
self.segment_max_age = other.segment_max_age;
}
if other.flush_interval.is_some() {
self.flush_interval = other.flush_interval;
}
if other.sync.is_some() {
self.sync = other.sync;
}
if other.recovery.is_some() {
self.recovery = other.recovery;
}
if other.retention_bytes.is_some() {
self.retention_bytes = other.retention_bytes;
}
if other.queue_size.is_some() {
self.queue_size = other.queue_size;
}
if other.wal_dir.is_some() {
self.wal_dir = other.wal_dir;
}
if other.segment_backend.is_some() {
self.segment_backend = other.segment_backend;
}
if other.state_store.is_some() {
self.state_store = other.state_store;
}
}
}
pub(super) const DEFAULT_TABLE_NAME: &str = "tonbo-default";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DurabilityClass {
Volatile,
Durable,
}
impl DurabilityClass {
fn is_durable(self) -> bool {
matches!(self, Self::Durable)
}
}
#[derive(Debug, Default)]
pub struct Unconfigured;
pub struct StorageConfig<FS> {
fs: Arc<FS>,
root: Path,
table_name: Option<String>,
wal_config: Option<WalConfig>,
durability: DurabilityClass,
create_layout: bool,
}
impl<FS> StorageConfig<FS> {
pub fn new(fs: Arc<FS>, root: Path, durability: DurabilityClass) -> Self {
let wal_config = if durability.is_durable() {
Some(WalConfig::default())
} else {
None
};
Self {
fs,
root,
table_name: None,
wal_config,
durability,
create_layout: false,
}
}
#[must_use]
pub fn with_table_name(mut self, name: impl Into<String>) -> Self {
self.table_name = Some(name.into());
self
}
#[must_use]
pub fn with_wal_config(mut self, config: WalConfig) -> Self {
if let Some(ref mut existing) = self.wal_config {
existing.merge(config);
} else if self.durability.is_durable() {
self.wal_config = Some(config);
}
self
}
#[must_use]
pub fn with_create_layout(mut self, enable: bool) -> Self {
self.create_layout = enable;
self
}
pub fn durability(&self) -> DurabilityClass {
self.durability
}
pub fn table_name(&self) -> Option<&String> {
self.table_name.as_ref()
}
pub fn table_name_mut(&mut self) -> &mut Option<String> {
&mut self.table_name
}
pub fn wal_config(&self) -> Option<&WalConfig> {
self.wal_config.as_ref()
}
pub fn wal_config_mut(&mut self) -> Option<&mut WalConfig> {
self.wal_config.as_mut()
}
pub fn should_create_layout(&self) -> bool {
self.create_layout
}
pub async fn prepare(&self) -> Result<(), DbBuildError>
where
FS: Fs,
{
if self.create_layout {
ensure_storage_layout::<FS>(&self.root).await
} else {
Ok(())
}
}
pub fn layout(&self) -> Result<StorageLayout<FS>, DbBuildError>
where
FS: DynFs + FusioCas + 'static,
{
if self.root.as_ref().is_empty() {
return Err(DbBuildError::InvalidPath {
path: self.root.to_string(),
reason: "root cannot be empty".into(),
});
}
let cas: Arc<dyn FusioCas> = self.fs.clone();
Ok(StorageLayout::new(
self.fs.clone(),
Some(cas),
self.root.clone(),
))
}
}
pub mod wal_tuning {
use fusio::{
DynFs,
dynamic::{MaybeSend, MaybeSync},
fs::FsCas as FusioCas,
};
use super::{DbBuilder, StorageConfig, WalConfig};
pub trait WalConfigExt<FS>: Sized {
fn wal_config(self, overrides: WalConfig) -> Self;
}
impl<FS> WalConfigExt<FS> for DbBuilder<StorageConfig<FS>>
where
FS: DynFs + FusioCas + Clone + MaybeSend + MaybeSync + 'static,
{
fn wal_config(self, overrides: WalConfig) -> Self {
DbBuilder::wal_config(self, overrides)
}
}
}
async fn ensure_storage_layout<FS>(root: &Path) -> Result<(), DbBuildError>
where
FS: Fs,
{
if root.as_ref().is_empty() {
return Err(DbBuildError::InvalidPath {
path: root.to_string(),
reason: "root cannot be empty".into(),
});
}
async fn mk_dir<F: Fs>(path: &Path) -> Result<(), DbBuildError> {
F::create_dir_all(path)
.await
.map_err(|err| DbBuildError::PreparePath {
path: path.to_string(),
source: io::Error::other(err.to_string()),
})
}
mk_dir::<FS>(&root.child(PathPart::parse("wal").expect("wal part"))).await?;
mk_dir::<FS>(&root.child(PathPart::parse("sst").expect("sst part"))).await?;
let manifest_root = root.child(PathPart::parse("manifest").expect("manifest part"));
let version_root = manifest_root.child(PathPart::parse("version").expect("version part"));
let catalog_root = manifest_root.child(PathPart::parse("catalog").expect("catalog part"));
let gc_root = manifest_root.child(PathPart::parse("gc").expect("gc part"));
ensure_manifest_dirs::<FS>(&version_root)
.await
.map_err(DbBuildError::Manifest)?;
ensure_manifest_dirs::<FS>(&catalog_root)
.await
.map_err(DbBuildError::Manifest)?;
ensure_manifest_dirs::<FS>(&gc_root)
.await
.map_err(DbBuildError::Manifest)?;
Ok(())
}
pub struct DbBuilder<S = Unconfigured> {
mode_config: DynModeConfig,
state: S,
compaction_options: Option<CompactionOptions>,
minor_compaction: Option<MinorCompactionOptions>,
seal_policy: Option<Arc<dyn crate::inmem::policy::SealPolicy + Send + Sync>>,
}
#[derive(Debug, Error)]
pub enum DbBuildError {
#[error("storage backend not selected")]
MissingStorage,
#[error("invalid storage path `{path}`: {reason}")]
InvalidPath {
path: String,
reason: String,
},
#[error("object-store backend support not implemented")]
UnsupportedObjectStore,
#[error("object-store configuration error: {reason}")]
ObjectStoreConfig {
reason: String,
},
#[error("{backend} backend not supported for this build target")]
UnsupportedBackend {
backend: &'static str,
},
#[error(transparent)]
Mode(#[from] KeyExtractError),
#[error(transparent)]
Manifest(#[from] ManifestError),
#[error(transparent)]
Wal(#[from] WalError),
#[error("failed to prepare directory `{path}`: {source}")]
PreparePath {
path: String,
#[source]
source: std::io::Error,
},
}
#[derive(Debug, Clone)]
pub enum ObjectSpec {
S3(S3Spec),
}
impl ObjectSpec {
#[must_use]
pub fn s3(spec: S3Spec) -> Self {
Self::S3(spec)
}
}
#[derive(Debug, Clone)]
pub struct S3Spec {
pub bucket: String,
pub prefix: String,
pub kms_key: Option<String>,
pub credentials: AwsCreds,
pub endpoint: Option<String>,
pub region: Option<String>,
pub sign_payload: Option<bool>,
pub checksum: Option<bool>,
pub versioned: Option<bool>,
pub s3_express: Option<bool>,
}
impl S3Spec {
#[must_use]
pub fn new(
bucket: impl Into<String>,
prefix: impl Into<String>,
credentials: AwsCreds,
) -> Self {
Self {
bucket: bucket.into(),
prefix: prefix.into(),
kms_key: None,
credentials,
endpoint: None,
region: None,
sign_payload: None,
checksum: None,
versioned: None,
s3_express: None,
}
}
}
#[derive(Debug, Clone)]
pub struct AwsCreds {
pub access_key: String,
pub secret_key: String,
pub session_token: Option<String>,
}
impl AwsCreds {
#[must_use]
pub fn new(access_key: impl Into<String>, secret_key: impl Into<String>) -> Self {
Self {
access_key: access_key.into(),
secret_key: secret_key.into(),
session_token: None,
}
}
#[must_use]
pub fn with_session_token(
access_key: impl Into<String>,
secret_key: impl Into<String>,
token: impl Into<String>,
) -> Self {
Self {
access_key: access_key.into(),
secret_key: secret_key.into(),
session_token: Some(token.into()),
}
}
pub fn from_env() -> Result<Self, AwsCredsError> {
let access_key = env::var("AWS_ACCESS_KEY_ID").map_err(|_| AwsCredsError::MissingEnv {
var: "AWS_ACCESS_KEY_ID",
})?;
let secret_key =
env::var("AWS_SECRET_ACCESS_KEY").map_err(|_| AwsCredsError::MissingEnv {
var: "AWS_SECRET_ACCESS_KEY",
})?;
let session_token = env::var("AWS_SESSION_TOKEN").ok();
Ok(Self {
access_key,
secret_key,
session_token,
})
}
}
#[derive(Debug, Error)]
pub enum AwsCredsError {
#[error("missing AWS credential environment variable `{var}`")]
MissingEnv {
var: &'static str,
},
}
#[derive(Clone)]
struct StorageRoute {
fs: Arc<dyn DynFs>,
path: Path,
cas: Option<Arc<dyn FusioCas>>,
}
#[derive(Clone)]
pub struct StorageLayout<FS> {
fs: Arc<FS>,
dyn_fs: Arc<dyn DynFs>,
cas: Option<Arc<dyn FusioCas>>,
root: Path,
}
impl<FS> StorageLayout<FS> {
fn new(fs: Arc<FS>, cas: Option<Arc<dyn FusioCas>>, root: Path) -> Self
where
FS: DynFs + 'static,
{
let dyn_fs: Arc<dyn DynFs> = fs.clone();
Self {
fs,
dyn_fs,
cas,
root,
}
}
fn dyn_fs(&self) -> Arc<dyn DynFs> {
Arc::clone(&self.dyn_fs)
}
fn root(&self) -> &Path {
&self.root
}
fn wal_route(&self) -> Result<StorageRoute, DbBuildError> {
let mut current = self.root.clone();
let wal = PathPart::parse("wal").map_err(|err| DbBuildError::InvalidPath {
path: "wal".into(),
reason: err.to_string(),
})?;
current = current.child(wal);
Ok(StorageRoute {
fs: Arc::clone(&self.dyn_fs),
path: current,
cas: self.cas.clone(),
})
}
fn sst_route(&self) -> Result<StorageRoute, DbBuildError> {
let mut current = self.root.clone();
let sst = PathPart::parse("sst").map_err(|err| DbBuildError::InvalidPath {
path: "sst".into(),
reason: err.to_string(),
})?;
current = current.child(sst);
Ok(StorageRoute {
fs: Arc::clone(&self.dyn_fs),
path: current,
cas: self.cas.clone(),
})
}
#[allow(clippy::arc_with_non_send_sync)]
fn apply_wal_defaults(&self, cfg: &mut RuntimeWalConfig) -> Result<(), DbBuildError> {
let route = self.wal_route()?;
cfg.dir = route.path.clone();
cfg.segment_backend = Arc::clone(&route.fs);
cfg.state_store = route
.cas
.clone()
.map(|cas| Arc::new(FsWalStateStore::new(cas)) as Arc<dyn WalStateStore>);
Ok(())
}
}
#[allow(clippy::arc_with_non_send_sync)]
fn build_s3_fs(
spec: S3Spec,
) -> Result<(Arc<fusio::impls::remotes::aws::fs::AmazonS3>, Path), DbBuildError> {
use fusio::impls::remotes::aws::{credential::AwsCredential, fs::AmazonS3Builder};
let region = spec.region.clone().unwrap_or_else(|| "us-east-1".into());
let mut builder = AmazonS3Builder::new(spec.bucket.clone()).region(region);
if let Some(endpoint) = &spec.endpoint {
builder = builder.endpoint(endpoint.clone());
}
if let Some(s3_express) = spec.s3_express {
builder = builder.s3_express(s3_express);
}
let credential = AwsCredential {
key_id: spec.credentials.access_key.clone(),
secret_key: spec.credentials.secret_key.clone(),
token: spec.credentials.session_token.clone(),
};
builder = builder.credential(credential);
if let Some(sign) = spec.sign_payload {
builder = builder.sign_payload(sign);
}
if let Some(checksum) = spec.checksum {
builder = builder.checksum(checksum);
}
let fs = Arc::new(builder.build());
let root = if spec.prefix.is_empty() {
Path::default()
} else {
Path::parse(&spec.prefix).map_err(|err| DbBuildError::InvalidPath {
path: spec.prefix.clone(),
reason: err.to_string(),
})?
};
Ok((fs, root))
}
async fn wal_segments_exist(cfg: &RuntimeWalConfig) -> Result<bool, DbBuildError> {
let storage = WalStorage::new(Arc::clone(&cfg.segment_backend), cfg.dir.clone());
let segments = storage.list_segments().await?;
Ok(!segments.is_empty())
}
struct ManifestBootstrap<'a, FS> {
layout: &'a StorageLayout<FS>,
}
impl<'a, FS> ManifestBootstrap<'a, FS> {
fn new(layout: &'a StorageLayout<FS>) -> Self {
Self { layout }
}
async fn init_manifest<E>(&self, executor: E) -> Result<TonboManifest<FS, E>, DbBuildError>
where
FS: ManifestFs<E>,
E: Executor + Timer + Clone + 'static,
HeadStoreImpl<FS>: fusio_manifest::HeadStore,
SegmentStoreImpl<FS>: fusio_manifest::SegmentIo,
CheckpointStoreImpl<FS>: fusio_manifest::CheckpointStore,
LeaseStoreImpl<FS, E>: fusio_manifest::LeaseStore,
<FS as fusio::fs::Fs>::File: fusio::durability::FileCommit,
{
init_fs_manifest(
Arc::as_ref(&self.layout.fs).clone(),
self.layout.root(),
executor,
)
.await
.map_err(DbBuildError::Manifest)
}
}
impl DbBuilder<Unconfigured> {
pub(super) fn new(mode_config: DynModeConfig) -> Self {
Self {
mode_config,
state: Unconfigured,
compaction_options: None,
minor_compaction: Some(MinorCompactionOptions::default()),
seal_policy: None,
}
}
pub fn in_memory(
self,
label: impl Into<String>,
) -> Result<DbBuilder<StorageConfig<InMemoryFs>>, DbBuildError> {
let label_str = label.into();
let root = Path::parse(&label_str).map_err(|err| DbBuildError::InvalidPath {
path: label_str,
reason: err.to_string(),
})?;
let fs = Arc::new(InMemoryFs::new());
Ok(DbBuilder {
mode_config: self.mode_config,
state: StorageConfig::new(fs, root, DurabilityClass::Volatile),
compaction_options: self.compaction_options,
minor_compaction: self.minor_compaction,
seal_policy: self.seal_policy,
})
}
#[must_use = "use the returned DbBuilder to continue configuration"]
#[cfg(all(feature = "tokio", not(target_arch = "wasm32")))]
pub fn on_disk(
self,
root: impl AsRef<std::path::Path>,
) -> Result<DbBuilder<StorageConfig<LocalFs>>, DbBuildError> {
self.on_durable_fs(Arc::new(LocalFs {}), root)
}
#[must_use = "use the returned DbBuilder to continue configuration"]
#[cfg(all(feature = "tokio", not(target_arch = "wasm32")))]
pub fn on_durable_fs<FS>(
self,
fs: Arc<FS>,
root: impl AsRef<std::path::Path>,
) -> Result<DbBuilder<StorageConfig<FS>>, DbBuildError>
where
FS: DynFs + FusioCas + Clone + MaybeSend + MaybeSync + 'static,
{
let root_ref = root.as_ref();
let path =
Path::from_filesystem_path(root_ref).map_err(|err| DbBuildError::InvalidPath {
path: root_ref.display().to_string(),
reason: err.to_string(),
})?;
let state = StorageConfig::new(fs, path, DurabilityClass::Durable).with_create_layout(true);
Ok(DbBuilder {
mode_config: self.mode_config,
state,
compaction_options: self.compaction_options,
minor_compaction: self.minor_compaction,
seal_policy: self.seal_policy,
})
}
pub fn object_store(
self,
spec: ObjectSpec,
) -> Result<DbBuilder<StorageConfig<fusio::impls::remotes::aws::fs::AmazonS3>>, DbBuildError>
{
let (fs, root) = match spec {
ObjectSpec::S3(s3_spec) => build_s3_fs(s3_spec)?,
};
Ok(DbBuilder {
mode_config: self.mode_config,
state: StorageConfig::new(fs, root, DurabilityClass::Durable),
compaction_options: self.compaction_options,
minor_compaction: self.minor_compaction,
seal_policy: self.seal_policy,
})
}
#[must_use = "use the returned DbBuilder to continue configuration"]
pub fn object_store_with_fs<FS>(
self,
fs: Arc<FS>,
root: Path,
) -> Result<DbBuilder<StorageConfig<FS>>, DbBuildError>
where
FS: DynFs + FusioCas + Clone + MaybeSend + MaybeSync + 'static,
{
Ok(DbBuilder {
mode_config: self.mode_config,
state: StorageConfig::new(fs, root, DurabilityClass::Durable),
compaction_options: self.compaction_options,
minor_compaction: self.minor_compaction,
seal_policy: self.seal_policy,
})
}
pub fn from_schema_key_name(
schema: arrow_schema::SchemaRef,
key_name: impl Into<String>,
) -> Result<Self, DbBuildError> {
let key = key_name.into();
let cfg = DynModeConfig::from_key_name(schema, key.as_str()).map_err(DbBuildError::Mode)?;
Ok(Self::new(cfg))
}
pub fn from_schema_key_indices(
schema: arrow_schema::SchemaRef,
key_indices: Vec<usize>,
) -> Result<Self, DbBuildError> {
let extractor =
projection_for_columns(schema.clone(), key_indices).map_err(DbBuildError::Mode)?;
let cfg = DynModeConfig::new(schema, extractor).map_err(DbBuildError::Mode)?;
Ok(Self::new(cfg))
}
pub fn from_schema(schema: arrow_schema::SchemaRef) -> Result<Self, DbBuildError> {
let cfg = DynModeConfig::from_metadata(schema).map_err(DbBuildError::Mode)?;
Ok(Self::new(cfg))
}
pub fn from_schema_metadata(schema: arrow_schema::SchemaRef) -> Result<Self, DbBuildError> {
Self::from_schema(schema)
}
}
impl<FS> DbBuilder<StorageConfig<FS>>
where
FS: DynFs + FusioCas + Clone + MaybeSend + MaybeSync + 'static,
{
#[must_use]
pub fn with_compaction_options(mut self, options: CompactionOptions) -> Self {
self.compaction_options = Some(options);
self
}
#[must_use]
pub fn with_compaction_strategy(mut self, strategy: CompactionStrategy) -> Self {
let mut options = self.compaction_options.unwrap_or_default();
options.strategy = strategy;
self.compaction_options = Some(options);
self
}
#[must_use]
pub fn with_minor_compaction(mut self, segment_threshold: usize, target_level: usize) -> Self {
self.minor_compaction = Some(MinorCompactionOptions {
segment_threshold,
target_level,
});
self
}
#[must_use]
pub fn disable_minor_compaction(mut self) -> Self {
self.minor_compaction = None;
self
}
#[must_use]
pub fn with_seal_policy(
mut self,
policy: Arc<dyn crate::inmem::policy::SealPolicy + Send + Sync>,
) -> Self {
self.seal_policy = Some(policy);
self
}
#[allow(clippy::arc_with_non_send_sync)]
fn build_minor_compaction_state(
layout: &StorageLayout<FS>,
cfg: &MinorCompactionOptions,
id_allocator: Arc<AtomicU64>,
schema: SchemaRef,
extractor: Arc<dyn KeyProjection>,
) -> Result<MinorCompactionState, DbBuildError>
where
FS: DynFs + FusioCas + 'static,
{
let route = layout.sst_route()?;
let config = Arc::new(
SsTableConfig::new(schema, route.fs, route.path.clone())
.with_key_extractor(extractor)
.with_target_level(cfg.target_level),
);
let compactor = MinorCompactor::with_id_allocator(
cfg.segment_threshold,
cfg.target_level,
id_allocator,
);
Ok(MinorCompactionState::new(compactor, config))
}
#[must_use]
pub fn table_name(mut self, name: impl Into<String>) -> Self {
*self.state.table_name_mut() = Some(name.into());
self
}
#[cfg(feature = "tokio")]
pub async fn open(self) -> Result<DB<FS, TokioExecutor>, DbBuildError>
where
FS: ManifestFs<TokioExecutor>,
<FS as fusio::fs::Fs>::File: fusio::durability::FileCommit,
{
let executor = Arc::new(TokioExecutor::default());
self.open_with_executor(executor).await
}
#[cfg(feature = "tokio")]
pub async fn build(self) -> Result<DB<FS, TokioExecutor>, DbBuildError>
where
FS: ManifestFs<TokioExecutor>,
<FS as fusio::fs::Fs>::File: fusio::durability::FileCommit,
{
self.open().await
}
pub async fn open_with_executor<E>(self, executor: Arc<E>) -> Result<DB<FS, E>, DbBuildError>
where
E: Executor + Timer + Clone + 'static,
FS: ManifestFs<E>,
<FS as fusio::fs::Fs>::File: fusio::durability::FileCommit,
{
if self.state.durability().is_durable() {
return self.recover_or_init_with_executor(executor).await;
}
self.state.prepare().await?;
let layout = self.state.layout()?;
self.build_with_layout(executor, layout).await
}
async fn build_with_layout<E>(
self,
executor: Arc<E>,
layout: StorageLayout<FS>,
) -> Result<DB<FS, E>, DbBuildError>
where
E: Executor + Timer + Clone + 'static,
FS: ManifestFs<E> + fusio_manifest::ObjectHead,
<FS as fusio::fs::Fs>::File: fusio::durability::FileCommit,
{
let DbBuilder {
mode_config,
state,
compaction_options,
minor_compaction,
seal_policy,
} = self;
let manifest_init = ManifestBootstrap::new(&layout);
let file_ids = FileIdGenerator::default();
let table_name = state
.table_name()
.cloned()
.unwrap_or_else(|| DEFAULT_TABLE_NAME.to_string());
let table_definition = table_definition(&mode_config, &table_name);
let sstable_schema = mode_config.schema();
let sstable_extractor = Arc::clone(&mode_config.extractor);
let (schema, delete_schema, commit_ack_mode, mem) =
mode_config.build().map_err(DbBuildError::Mode)?;
let manifest = manifest_init
.init_manifest(executor.as_ref().clone())
.await?;
let table_meta = manifest
.register_table(&file_ids, &table_definition)
.await
.map_err(DbBuildError::Manifest)?;
let manifest_table = table_meta.table_id;
let sstable_id_allocator = if minor_compaction.is_some() || compaction_options.is_some() {
Some(Self::sstable_id_allocator_for_table(&manifest, &table_meta).await?)
} else {
None
};
let minor_compaction_state = if let (Some(cfg), Some(id_allocator)) =
(minor_compaction.as_ref(), sstable_id_allocator.as_ref())
{
Some(Self::minor_compaction_state_for_table(
cfg,
&layout,
Arc::clone(id_allocator),
Arc::clone(&sstable_schema),
Arc::clone(&sstable_extractor),
)?)
} else {
None
};
let compaction_state = if let (Some(_), Some(id_allocator)) =
(compaction_options.as_ref(), sstable_id_allocator.as_ref())
{
Some(Self::compaction_worker_state_for_table(
&layout,
Arc::clone(id_allocator),
Arc::clone(&sstable_schema),
Arc::clone(&sstable_extractor),
)?)
} else {
None
};
let mut wal_cfg = if state.durability().is_durable() {
let mut cfg = RuntimeWalConfig::default();
layout.apply_wal_defaults(&mut cfg)?;
if let Some(overrides) = state.wal_config() {
overrides.apply(&mut cfg);
}
Some(cfg)
} else {
None
};
let mut inner = DbInner::from_components(
schema,
delete_schema,
commit_ack_mode,
mem,
layout.dyn_fs(),
layout.sst_route()?.path,
manifest,
manifest_table,
table_meta,
wal_cfg.clone(),
executor,
);
inner.minor_compaction = minor_compaction_state;
if let Some(policy) = seal_policy {
inner.set_seal_policy(policy);
}
if let Some(options) = compaction_options.as_ref() {
inner.l0_backpressure = options.backpressure().cloned();
inner.cas_backoff = options.cas_backoff_config().clone();
inner.compaction_metrics = options.metrics();
}
if let Some(cfg) = wal_cfg.take() {
inner.enable_wal(cfg).await?;
}
if let (Some(options), Some((sst_config, id_allocator))) =
(compaction_options, compaction_state)
{
let planner = options.strategy.clone().build();
let mut exec =
LocalCompactionExecutor::with_id_allocator(Arc::clone(&sst_config), id_allocator);
if let Some(max_rows) = options.max_output_rows {
exec = exec.with_max_output_rows(max_rows);
}
if let Some(max_bytes) = options.max_output_bytes {
exec = exec.with_max_output_bytes(max_bytes);
}
let driver = Arc::new(inner.compaction_driver());
let worker_config = CompactionWorkerConfig::new(
options.effective_tick(),
options.effective_queue_capacity(),
options.effective_concurrency(),
options.cascade_config().clone(),
);
let handle =
driver.spawn_worker(Arc::clone(&inner.executor), planner, exec, worker_config);
inner.compaction_worker = Some(handle);
inner.kick_compaction_worker();
}
Ok(DB::from_inner(Arc::new(inner)))
}
}
impl<S> DbBuilder<S> {
#[must_use]
pub fn with_commit_ack_mode(mut self, mode: CommitAckMode) -> Self {
self.mode_config.commit_ack_mode = mode;
self
}
}
#[derive(Clone, Debug)]
pub struct CompactionOptions {
strategy: CompactionStrategy,
max_concurrent_jobs: usize,
queue_capacity: usize,
max_output_rows: Option<usize>,
max_output_bytes: Option<usize>,
periodic_tick: Option<Duration>,
backpressure: Option<L0BackpressureConfig>,
cascade: CascadeConfig,
cas_backoff: CasBackoffConfig,
compaction_metrics: Option<Arc<CompactionMetrics>>,
}
impl Default for CompactionOptions {
fn default() -> Self {
Self {
strategy: CompactionStrategy::default(),
max_concurrent_jobs: 1,
queue_capacity: 1,
max_output_rows: None,
max_output_bytes: None,
periodic_tick: None,
backpressure: Some(L0BackpressureConfig::default()),
cascade: CascadeConfig::default(),
cas_backoff: CasBackoffConfig::default(),
compaction_metrics: None,
}
}
}
impl CompactionOptions {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn strategy(mut self, strategy: CompactionStrategy) -> Self {
self.strategy = strategy;
self
}
#[must_use]
pub fn max_concurrent_jobs(mut self, jobs: usize) -> Self {
self.max_concurrent_jobs = jobs.max(1);
self
}
#[must_use]
pub fn queue_capacity(mut self, capacity: usize) -> Self {
self.queue_capacity = capacity.max(1);
self
}
#[must_use]
pub fn max_output_rows(mut self, rows: usize) -> Self {
self.max_output_rows = Some(rows.max(1));
self
}
#[must_use]
pub fn max_output_bytes(mut self, bytes: usize) -> Self {
self.max_output_bytes = Some(bytes.max(1));
self
}
#[must_use]
pub fn periodic_tick(mut self, interval: Duration) -> Self {
if interval.is_zero() {
self.periodic_tick = None;
} else {
self.periodic_tick = Some(interval);
}
self
}
#[must_use]
pub fn l0_backpressure(mut self, config: L0BackpressureConfig) -> Self {
self.backpressure = Some(config);
self
}
#[must_use]
pub fn disable_l0_backpressure(mut self) -> Self {
self.backpressure = None;
self
}
#[must_use]
pub fn cascade(mut self, config: CascadeConfig) -> Self {
self.cascade = config;
self
}
#[must_use]
pub fn cas_backoff(mut self, config: CasBackoffConfig) -> Self {
self.cas_backoff = config;
self
}
#[must_use]
#[doc(hidden)]
pub fn compaction_metrics(mut self, metrics: Arc<CompactionMetrics>) -> Self {
self.compaction_metrics = Some(metrics);
self
}
fn effective_concurrency(&self) -> usize {
self.max_concurrent_jobs.max(1)
}
fn effective_queue_capacity(&self) -> usize {
self.queue_capacity.max(self.effective_concurrency()).max(1)
}
fn effective_tick(&self) -> Option<Duration> {
self.periodic_tick.filter(|interval| !interval.is_zero())
}
fn backpressure(&self) -> Option<&L0BackpressureConfig> {
self.backpressure.as_ref()
}
fn cascade_config(&self) -> &CascadeConfig {
&self.cascade
}
fn cas_backoff_config(&self) -> &CasBackoffConfig {
&self.cas_backoff
}
fn metrics(&self) -> Option<Arc<CompactionMetrics>> {
self.compaction_metrics.clone()
}
}
#[derive(Clone, Debug)]
pub struct L0BackpressureConfig {
slowdown_files: usize,
stop_files: usize,
slowdown_bytes: Option<usize>,
stop_bytes: Option<usize>,
slowdown_delay: Duration,
stop_delay: Duration,
}
impl Default for L0BackpressureConfig {
fn default() -> Self {
Self {
slowdown_files: 16,
stop_files: 32,
slowdown_bytes: None,
stop_bytes: None,
slowdown_delay: Duration::from_millis(25),
stop_delay: Duration::from_millis(200),
}
}
}
impl L0BackpressureConfig {
#[must_use]
pub fn new(slowdown_files: usize, stop_files: usize) -> Self {
let slowdown_files = slowdown_files.max(1);
let stop_files = stop_files.max(slowdown_files);
Self {
slowdown_files,
stop_files,
..Default::default()
}
}
#[must_use]
pub fn slowdown_delay(mut self, delay: Duration) -> Self {
if !delay.is_zero() {
self.slowdown_delay = delay;
}
self
}
#[must_use]
pub fn stop_delay(mut self, delay: Duration) -> Self {
if !delay.is_zero() {
self.stop_delay = delay;
}
self
}
#[must_use]
pub fn slowdown_bytes(mut self, bytes: usize) -> Self {
self.slowdown_bytes = Some(bytes.max(1));
if let Some(stop) = self.stop_bytes {
self.stop_bytes = Some(stop.max(bytes.max(1)));
}
self
}
#[must_use]
pub fn stop_bytes(mut self, bytes: usize) -> Self {
let bytes = bytes.max(1);
self.stop_bytes = Some(bytes);
if let Some(slowdown) = self.slowdown_bytes {
self.stop_bytes = Some(bytes.max(slowdown));
}
self
}
pub(crate) fn slowdown_files(&self) -> usize {
self.slowdown_files
}
pub(crate) fn stop_files(&self) -> usize {
self.stop_files
}
pub(crate) fn slowdown_bytes_limit(&self) -> Option<usize> {
self.slowdown_bytes
}
pub(crate) fn stop_bytes_limit(&self) -> Option<usize> {
self.stop_bytes
}
pub(crate) fn slowdown_delay_value(&self) -> Duration {
self.slowdown_delay
}
pub(crate) fn stop_delay_value(&self) -> Duration {
self.stop_delay
}
}
#[derive(Clone, Debug)]
pub struct CascadeConfig {
max_follow_ups: usize,
cooldown: Duration,
}
impl Default for CascadeConfig {
fn default() -> Self {
Self {
max_follow_ups: 1,
cooldown: Duration::from_millis(500),
}
}
}
impl CascadeConfig {
#[must_use]
pub fn new(max_follow_ups: usize, cooldown: Duration) -> Self {
Self {
max_follow_ups,
cooldown,
}
}
pub(crate) fn max_follow_ups(&self) -> usize {
self.max_follow_ups
}
pub(crate) fn cooldown(&self) -> Duration {
self.cooldown
}
}
#[derive(Clone, Debug)]
pub struct CasBackoffConfig {
base_delay: Duration,
max_delay: Duration,
}
impl Default for CasBackoffConfig {
fn default() -> Self {
Self {
base_delay: Duration::from_millis(50),
max_delay: Duration::from_secs(1),
}
}
}
impl CasBackoffConfig {
#[must_use]
pub fn new(base_delay: Duration, max_delay: Duration) -> Self {
let base_delay = if base_delay.is_zero() {
Duration::from_millis(1)
} else {
base_delay
};
let max_delay = if max_delay < base_delay {
base_delay
} else {
max_delay
};
Self {
base_delay,
max_delay,
}
}
pub(crate) fn base_delay(&self) -> Duration {
self.base_delay
}
pub(crate) fn max_delay(&self) -> Duration {
self.max_delay
}
}
#[derive(Clone, Debug)]
struct MinorCompactionOptions {
segment_threshold: usize,
target_level: usize,
}
impl Default for MinorCompactionOptions {
fn default() -> Self {
Self {
segment_threshold: 4,
target_level: 0,
}
}
}
impl<FS> DbBuilder<StorageConfig<FS>>
where
FS: DynFs + FusioCas + Clone + MaybeSend + MaybeSync + 'static,
{
#[must_use]
pub(crate) fn wal_config(mut self, overrides: WalConfig) -> Self {
if let Some(ref mut existing) = self.state.wal_config {
existing.merge(overrides);
} else if self.state.durability.is_durable() {
self.state.wal_config = Some(overrides);
}
self
}
#[must_use]
pub fn wal_segment_bytes(mut self, max_bytes: usize) -> Self {
if let Some(ref mut cfg) = self.state.wal_config {
cfg.segment_max_bytes = Some(max_bytes);
}
self
}
#[must_use]
pub fn wal_sync_policy(mut self, policy: WalSyncPolicy) -> Self {
if let Some(ref mut cfg) = self.state.wal_config {
cfg.sync = Some(policy);
}
self
}
#[must_use]
pub fn wal_flush_interval(mut self, interval: Duration) -> Self {
if let Some(ref mut cfg) = self.state.wal_config {
cfg.flush_interval = Some(interval);
}
self
}
#[must_use]
pub fn wal_retention_bytes(mut self, retention: Option<usize>) -> Self {
if let Some(ref mut cfg) = self.state.wal_config {
cfg.retention_bytes = Some(retention);
}
self
}
fn minor_compaction_state_for_table(
cfg: &MinorCompactionOptions,
layout: &StorageLayout<FS>,
id_allocator: Arc<AtomicU64>,
schema: SchemaRef,
extractor: Arc<dyn KeyProjection>,
) -> Result<MinorCompactionState, DbBuildError>
where
FS: DynFs + FusioCas + 'static,
{
Self::build_minor_compaction_state(layout, cfg, id_allocator, schema, extractor)
}
fn compaction_worker_state_for_table(
layout: &StorageLayout<FS>,
id_allocator: Arc<AtomicU64>,
schema: SchemaRef,
extractor: Arc<dyn KeyProjection>,
) -> Result<(Arc<SsTableConfig>, Arc<AtomicU64>), DbBuildError> {
let route = layout.sst_route()?;
let config = Arc::new(
SsTableConfig::new(schema, route.fs, route.path.clone()).with_key_extractor(extractor),
);
Ok((config, id_allocator))
}
async fn sstable_id_allocator_for_table<E>(
manifest: &TonboManifest<FS, E>,
table_meta: &TableMeta,
) -> Result<Arc<AtomicU64>, DbBuildError>
where
E: Executor + Timer + Clone + 'static,
FS: ManifestFs<E>,
<FS as fusio::fs::Fs>::File: fusio::durability::FileCommit,
{
let snapshot = manifest
.snapshot_latest_with_fallback(table_meta.table_id, table_meta)
.await
.map_err(DbBuildError::Manifest)?;
let next_id = match snapshot.latest_version.as_ref() {
Some(version) if !version.ssts().is_empty() => {
next_sstable_id(std::slice::from_ref(version))
}
_ => {
let versions = manifest
.list_versions(table_meta.table_id, 0)
.await
.map_err(DbBuildError::Manifest)?;
next_sstable_id(&versions)
}
};
Ok(Arc::new(AtomicU64::new(next_id)))
}
async fn recover_or_init_with_executor<E>(
self,
executor: Arc<E>,
) -> Result<DB<FS, E>, DbBuildError>
where
E: Executor + Timer + Clone + 'static,
FS: ManifestFs<E>,
<FS as fusio::fs::Fs>::File: fusio::durability::FileCommit,
{
let DbBuilder {
mode_config,
state,
compaction_options,
minor_compaction,
seal_policy,
} = self;
state.prepare().await?;
let layout = state.layout()?;
let mut wal_cfg = RuntimeWalConfig::default();
layout.apply_wal_defaults(&mut wal_cfg)?;
if let Some(overrides) = state.wal_config() {
overrides.apply(&mut wal_cfg);
}
if wal_segments_exist(&wal_cfg).await? {
let manifest_init = ManifestBootstrap::new(&layout);
let table_name = state
.table_name()
.cloned()
.unwrap_or_else(|| DEFAULT_TABLE_NAME.to_string());
let table_definition = table_definition(&mode_config, &table_name);
let file_ids = FileIdGenerator::default();
let manifest = manifest_init
.init_manifest(executor.as_ref().clone())
.await?;
let table_meta = manifest
.register_table(&file_ids, &table_definition)
.await
.map_err(DbBuildError::Manifest)?;
let manifest_table = table_meta.table_id;
let fs_dyn = layout.dyn_fs();
let sstable_schema = mode_config.schema();
let sstable_extractor = Arc::clone(&mode_config.extractor);
let sstable_id_allocator = if minor_compaction.is_some() || compaction_options.is_some()
{
Some(Self::sstable_id_allocator_for_table(&manifest, &table_meta).await?)
} else {
None
};
let minor_compaction_state = if let (Some(cfg), Some(id_allocator)) =
(minor_compaction.as_ref(), sstable_id_allocator.as_ref())
{
Some(Self::minor_compaction_state_for_table(
cfg,
&layout,
Arc::clone(id_allocator),
Arc::clone(&sstable_schema),
Arc::clone(&sstable_extractor),
)?)
} else {
None
};
let compaction_state = if let (Some(_), Some(id_allocator)) =
(compaction_options.as_ref(), sstable_id_allocator.as_ref())
{
Some(Self::compaction_worker_state_for_table(
&layout,
Arc::clone(id_allocator),
Arc::clone(&sstable_schema),
Arc::clone(&sstable_extractor),
)?)
} else {
None
};
let mut inner = DbInner::recover_with_wal_with_manifest(
mode_config,
Arc::clone(&executor),
fs_dyn,
layout.sst_route()?.path,
wal_cfg.clone(),
manifest,
manifest_table,
table_meta,
)
.await
.map_err(DbBuildError::Mode)?;
inner.minor_compaction = minor_compaction_state;
if let Some(ref policy) = seal_policy {
inner.set_seal_policy(Arc::clone(policy));
}
if let Some(options) = compaction_options.as_ref() {
inner.l0_backpressure = options.backpressure().cloned();
inner.cas_backoff = options.cas_backoff_config().clone();
inner.compaction_metrics = options.metrics();
}
inner.enable_wal(wal_cfg).await?;
if let (Some(options), Some((sst_config, id_allocator))) =
(compaction_options, compaction_state)
{
let planner = options.strategy.clone().build();
let mut exec = LocalCompactionExecutor::with_id_allocator(
Arc::clone(&sst_config),
id_allocator,
);
if let Some(max_rows) = options.max_output_rows {
exec = exec.with_max_output_rows(max_rows);
}
if let Some(max_bytes) = options.max_output_bytes {
exec = exec.with_max_output_bytes(max_bytes);
}
let driver = Arc::new(inner.compaction_driver());
let worker_config = CompactionWorkerConfig::new(
options.effective_tick(),
options.effective_queue_capacity(),
options.effective_concurrency(),
options.cascade_config().clone(),
);
let handle =
driver.spawn_worker(Arc::clone(&inner.executor), planner, exec, worker_config);
inner.compaction_worker = Some(handle);
inner.kick_compaction_worker();
}
Ok(DB::from_inner(Arc::new(inner)))
} else {
DbBuilder {
mode_config,
state,
compaction_options,
minor_compaction,
seal_policy,
}
.build_with_layout(executor, layout)
.await
}
}
}
fn next_sstable_id(versions: &[VersionState]) -> u64 {
let max_id = versions
.iter()
.flat_map(|version| {
version
.ssts()
.iter()
.flat_map(|level| level.iter().map(|entry| entry.sst_id().raw()))
})
.max();
max_id.map_or(1, |max_id| max_id.saturating_add(1))
}