use std::{
collections::{BTreeMap, HashMap},
sync::{
Arc, Mutex, MutexGuard,
atomic::{AtomicBool, Ordering},
},
time::Duration,
};
use aisle::Pruner;
use arrow_array::RecordBatch;
use arrow_schema::{ArrowError, SchemaRef};
use fusio::{
DynFs,
executor::{Executor, Instant as ExecInstant, Timer},
mem::fs::InMemoryFs,
path::Path,
};
use futures::lock::Mutex as AsyncMutex;
use lockable::LockableHashMap;
use wal::SealState;
use crate::compaction::{CompactionHandle, MinorCompactor, metrics::CompactionBackpressureSignal};
mod builder;
mod compaction;
mod error;
mod scan;
#[cfg(all(test, feature = "tokio"))]
mod tests;
mod wal;
pub use builder::{
AwsCreds, AwsCredsError, CasBackoffConfig, CascadeConfig, CompactionOptions, DbBuildError,
DbBuilder, L0BackpressureConfig, ObjectSpec, S3Spec, WalConfig, wal_tuning,
};
pub use error::DBError;
pub use scan::{DEFAULT_SCAN_BATCH_ROWS, ScanBuilder, ScanSetupProfile};
pub(crate) use wal::{TxnWalPublishContext, WalFrameRange};
#[cfg(not(test))]
use crate::observability::log_warn;
pub use crate::{
compaction::{
metrics::{
CompactionMetrics, CompactionMetricsSnapshot, SstGcInspection, SstGcStatus,
SstSweepSummary,
},
planner::{CompactionStrategy, LeveledPlannerConfig},
},
inmem::policy::{BatchesThreshold, NeverSeal, SealPolicy},
mode::DynModeConfig,
query::{Expr, ScalarValue},
schema::SchemaBuilder,
transaction::{CommitAckMode, Snapshot, SnapshotError, Transaction},
wal::WalSyncPolicy,
};
use crate::{
extractor::{KeyExtractError, KeyProjection},
id::FileId,
inmem::mutable::DynMem,
key::KeyOwned,
manifest::{
ManifestError, ManifestFs, SstEntry, TableId, TableMeta, TonboManifest, VersionEdit,
VersionState, WalSegmentRef,
},
mvcc::{CommitClock, ReadView, Timestamp},
ondisk::{
bloom::{BloomFilterCache, default_bloom_cache},
metadata::{ParquetMetadataCache, default_parquet_metadata_cache},
sstable::{
SsTable, SsTableBuilder, SsTableConfig, SsTableDescriptor, SsTableError,
manifest_storage_path,
},
},
transaction::{Snapshot as TxSnapshot, TransactionDurability, TransactionError},
wal::{
WalConfig as RuntimeWalConfig, WalHandle, frame::INITIAL_FRAME_SEQ, manifest_ext,
replay::Replayer, state::WalStateHandle,
},
};
pub(crate) type DynDbHandle<FS, E> = Arc<DbInner<FS, E>>;
type PrunerCache = HashMap<String, Arc<Pruner>>;
#[derive(Debug, Clone)]
pub struct Version {
pub timestamp: Timestamp,
pub sst_count: usize,
pub sst_bytes: u64,
pub level_count: usize,
}
#[derive(Debug, Clone, Copy, Default)]
pub struct WritePathProfile {
partition_ns: u64,
wal_append_submit_ns: u64,
wal_append_wait_ns: u64,
wal_append_ns: u64,
wal_commit_submit_ns: u64,
wal_commit_wait_ns: u64,
wal_commit_ns: u64,
mutable_insert_ns: u64,
seal_ns: u64,
minor_compaction_ns: u64,
total_ns: u64,
}
impl WritePathProfile {
pub fn partition_ns(&self) -> u64 {
self.partition_ns
}
pub fn wal_append_submit_ns(&self) -> u64 {
self.wal_append_submit_ns
}
pub fn wal_append_wait_ns(&self) -> u64 {
self.wal_append_wait_ns
}
pub fn wal_append_ns(&self) -> u64 {
self.wal_append_ns
}
pub fn wal_commit_submit_ns(&self) -> u64 {
self.wal_commit_submit_ns
}
pub fn wal_commit_wait_ns(&self) -> u64 {
self.wal_commit_wait_ns
}
pub fn wal_commit_ns(&self) -> u64 {
self.wal_commit_ns
}
pub fn mutable_insert_ns(&self) -> u64 {
self.mutable_insert_ns
}
pub fn seal_ns(&self) -> u64 {
self.seal_ns
}
pub fn minor_compaction_ns(&self) -> u64 {
self.minor_compaction_ns
}
pub fn total_ns(&self) -> u64 {
self.total_ns
}
#[must_use]
pub fn saturating_add(self, other: Self) -> Self {
Self {
partition_ns: self.partition_ns.saturating_add(other.partition_ns),
wal_append_submit_ns: self
.wal_append_submit_ns
.saturating_add(other.wal_append_submit_ns),
wal_append_wait_ns: self
.wal_append_wait_ns
.saturating_add(other.wal_append_wait_ns),
wal_append_ns: self.wal_append_ns.saturating_add(other.wal_append_ns),
wal_commit_submit_ns: self
.wal_commit_submit_ns
.saturating_add(other.wal_commit_submit_ns),
wal_commit_wait_ns: self
.wal_commit_wait_ns
.saturating_add(other.wal_commit_wait_ns),
wal_commit_ns: self.wal_commit_ns.saturating_add(other.wal_commit_ns),
mutable_insert_ns: self
.mutable_insert_ns
.saturating_add(other.mutable_insert_ns),
seal_ns: self.seal_ns.saturating_add(other.seal_ns),
minor_compaction_ns: self
.minor_compaction_ns
.saturating_add(other.minor_compaction_ns),
total_ns: self.total_ns.saturating_add(other.total_ns),
}
}
}
fn version_from_state(state: VersionState) -> Version {
let sst_count = state.ssts.iter().map(|level| level.len()).sum();
let sst_bytes = state
.ssts
.iter()
.flatten()
.filter_map(|entry| entry.stats().map(|stats| stats.bytes))
.fold(0u64, |acc, bytes| {
acc.saturating_add(u64::try_from(bytes).unwrap_or(u64::MAX))
});
Version {
timestamp: state.commit_timestamp,
sst_count,
sst_bytes,
level_count: state.ssts.iter().filter(|level| !level.is_empty()).count(),
}
}
#[derive(Debug, Default)]
pub(crate) struct SnapshotPinRegistry {
pins: Mutex<BTreeMap<Timestamp, usize>>,
}
impl SnapshotPinRegistry {
fn pin(self: &Arc<Self>, manifest_ts: Timestamp) -> SnapshotPinGuard {
let mut guard = self
.pins
.lock()
.expect("snapshot pin registry mutex poisoned");
let count = guard.entry(manifest_ts).or_insert(0);
*count = count.saturating_add(1);
drop(guard);
SnapshotPinGuard {
inner: Arc::new(SnapshotPinGuardInner {
registry: Arc::clone(self),
manifest_ts,
}),
}
}
pub(crate) fn active_versions(&self) -> Vec<Timestamp> {
self.pins
.lock()
.expect("snapshot pin registry mutex poisoned")
.keys()
.copied()
.collect()
}
fn release(&self, manifest_ts: Timestamp) {
let mut guard = self
.pins
.lock()
.expect("snapshot pin registry mutex poisoned");
let Some(count) = guard.get_mut(&manifest_ts) else {
return;
};
if *count <= 1 {
guard.remove(&manifest_ts);
} else {
*count -= 1;
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct SnapshotPinGuard {
#[allow(dead_code)]
inner: Arc<SnapshotPinGuardInner>,
}
#[derive(Debug)]
struct SnapshotPinGuardInner {
registry: Arc<SnapshotPinRegistry>,
manifest_ts: Timestamp,
}
impl Drop for SnapshotPinGuardInner {
fn drop(&mut self) {
self.registry.release(self.manifest_ts);
}
}
struct MinorCompactionState {
compactor: MinorCompactor,
config: Arc<SsTableConfig>,
lock: AsyncMutex<()>,
}
impl MinorCompactionState {
fn new(compactor: MinorCompactor, config: Arc<SsTableConfig>) -> Self {
Self {
compactor,
config,
lock: AsyncMutex::new(()),
}
}
async fn maybe_compact<FS, E>(
&self,
db: &DbInner<FS, E>,
) -> Result<Option<SsTable>, SsTableError>
where
FS: ManifestFs<E>,
E: Executor + Timer + Clone,
<FS as fusio::fs::Fs>::File: fusio::durability::FileCommit,
{
let _guard = self.lock.lock().await;
self.compactor
.maybe_compact(db, Arc::clone(&self.config))
.await
}
}
#[derive(Clone, Copy, Debug)]
struct L0Stats {
file_count: usize,
total_bytes: Option<usize>,
}
struct L0StatsCache {
stats: L0Stats,
last_refresh: ExecInstant,
valid: bool,
}
impl L0StatsCache {
fn new(now: ExecInstant) -> Self {
Self {
stats: L0Stats {
file_count: 0,
total_bytes: Some(0),
},
last_refresh: now,
valid: false,
}
}
fn is_fresh(&self, now: ExecInstant, refresh_interval: Duration) -> bool {
self.valid && now.saturating_duration_since(self.last_refresh) < refresh_interval
}
fn snapshot(&self) -> L0Stats {
self.stats
}
fn update(&mut self, now: ExecInstant, stats: L0Stats) {
self.stats = stats;
self.last_refresh = now;
self.valid = true;
}
}
struct L0StatsRefreshGuard<'a> {
flag: &'a AtomicBool,
}
impl<'a> L0StatsRefreshGuard<'a> {
fn new(flag: &'a AtomicBool) -> Self {
Self { flag }
}
}
impl Drop for L0StatsRefreshGuard<'_> {
fn drop(&mut self) {
self.flag.store(false, Ordering::Release);
}
}
#[derive(Clone, Copy, Debug)]
enum BackpressureDecision {
Proceed,
Slowdown(Duration),
Stall(Duration),
}
impl L0BackpressureConfig {
fn decision(&self, stats: L0Stats) -> BackpressureDecision {
let slowdown_files = stats.file_count >= self.slowdown_files();
let stop_files = stats.file_count >= self.stop_files();
let slowdown_bytes = match (self.slowdown_bytes_limit(), stats.total_bytes) {
(Some(limit), Some(total)) => total >= limit,
_ => false,
};
let stop_bytes = match (self.stop_bytes_limit(), stats.total_bytes) {
(Some(limit), Some(total)) => total >= limit,
_ => false,
};
if stop_files || stop_bytes {
return BackpressureDecision::Stall(self.stop_delay_value());
}
if slowdown_files || slowdown_bytes {
return BackpressureDecision::Slowdown(self.slowdown_delay_value());
}
BackpressureDecision::Proceed
}
}
pub struct DB<FS, E>
where
FS: ManifestFs<E>,
E: Executor + Timer + Clone + 'static,
<FS as fusio::fs::Fs>::File: fusio::durability::FileCommit,
{
inner: Arc<DbInner<FS, E>>,
}
impl<FS, E> Clone for DB<FS, E>
where
FS: ManifestFs<E>,
E: Executor + Timer + Clone + 'static,
<FS as fusio::fs::Fs>::File: fusio::durability::FileCommit,
{
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl<FS, E> DB<FS, E>
where
FS: ManifestFs<E>,
E: Executor + Timer + Clone + 'static,
<FS as fusio::fs::Fs>::File: fusio::durability::FileCommit,
{
#[cfg(test)]
#[doc(hidden)]
pub fn from_inner(inner: Arc<DbInner<FS, E>>) -> Self {
Self { inner }
}
#[cfg(not(test))]
pub(crate) fn from_inner(inner: Arc<DbInner<FS, E>>) -> Self {
Self { inner }
}
#[cfg(test)]
#[doc(hidden)]
pub fn inner(&self) -> &Arc<DbInner<FS, E>> {
&self.inner
}
#[cfg(test)]
#[doc(hidden)]
pub fn into_inner(self) -> DbInner<FS, E> {
Arc::try_unwrap(self.inner).unwrap_or_else(|_| panic!("DB has multiple references"))
}
pub async fn begin_transaction(&self) -> Result<Transaction<FS, E>, TransactionError> {
let handle = Arc::clone(&self.inner);
let snapshot = handle.begin_snapshot().await?;
let durability = if handle.wal_handle().is_some() {
TransactionDurability::Durable
} else {
TransactionDurability::Volatile
};
let schema = handle.schema.clone();
let delete_schema = handle.delete_schema.clone();
let extractor = Arc::clone(handle.extractor());
let commit_ack_mode = handle.commit_ack_mode;
Ok(Transaction::new(
handle,
schema,
delete_schema,
extractor,
snapshot,
commit_ack_mode,
durability,
))
}
pub fn builder(config: DynModeConfig) -> DbBuilder {
DbBuilder::new(config)
}
pub async fn begin_snapshot(&self) -> Result<TxSnapshot, SnapshotError> {
self.inner.begin_snapshot().await
}
pub async fn ingest(&self, batch: RecordBatch) -> Result<(), DBError> {
#[cfg(test)]
{
self.inner.ingest(batch).await.map_err(DBError::Key)
}
#[cfg(not(test))]
{
self.inner
.ingest_without_minor_compaction(batch)
.await
.map_err(DBError::Key)?;
DbInner::schedule_background_minor_compaction(Arc::clone(&self.inner));
Ok(())
}
}
pub async fn ingest_with_tombstones(
&self,
batch: RecordBatch,
tombstones: Vec<bool>,
) -> Result<(), DBError> {
#[cfg(test)]
{
self.inner
.ingest_with_tombstones(batch, tombstones)
.await
.map_err(DBError::Key)
}
#[cfg(not(test))]
{
self.inner
.ingest_with_tombstones_without_minor_compaction(batch, tombstones)
.await
.map_err(DBError::Key)?;
DbInner::schedule_background_minor_compaction(Arc::clone(&self.inner));
Ok(())
}
}
pub async fn ingest_with_tombstones_with_profile(
&self,
batch: RecordBatch,
tombstones: Vec<bool>,
) -> Result<WritePathProfile, DBError> {
#[cfg(test)]
{
self.inner
.ingest_with_tombstones_with_profile(batch, tombstones)
.await
.map_err(DBError::Key)
}
#[cfg(not(test))]
{
let profile = self
.inner
.ingest_with_tombstones_with_profile_without_minor_compaction(batch, tombstones)
.await
.map_err(DBError::Key)?;
DbInner::schedule_background_minor_compaction(Arc::clone(&self.inner));
Ok(profile)
}
}
pub fn scan(&self) -> ScanBuilder<'_, FS, E> {
ScanBuilder::new(&self.inner)
}
#[cfg(test)]
pub fn has_compaction_worker(&self) -> bool {
self.inner.has_compaction_worker()
}
pub async fn snapshot_at(&self, timestamp: Timestamp) -> Result<TxSnapshot, SnapshotError> {
self.inner.snapshot_at(timestamp).await
}
pub async fn list_versions(&self, limit: usize) -> Result<Vec<Version>, ManifestError> {
self.inner.list_versions(limit).await
}
#[doc(hidden)]
pub async fn sweep_sst_objects(&self) -> Result<SstSweepSummary, DBError> {
self.inner
.sweep_manifest_ssts()
.await
.map_err(DBError::from)
}
#[doc(hidden)]
pub fn compaction_metrics_snapshot(&self) -> Option<CompactionMetricsSnapshot> {
self.inner.compaction_metrics_snapshot()
}
#[doc(hidden)]
pub async fn sst_gc_status(&self) -> Result<Option<SstGcStatus>, DBError> {
self.inner.sst_gc_status().await.map_err(DBError::from)
}
#[doc(hidden)]
pub async fn inspect_sst_gc_plan(&self) -> Result<Option<SstGcInspection>, DBError> {
self.inner
.inspect_sst_gc_plan()
.await
.map_err(DBError::from)
}
}
impl<E> DB<InMemoryFs, E>
where
E: Executor + Timer + Clone + 'static,
{
#[cfg(test)]
pub(crate) async fn new(
config: DynModeConfig,
executor: Arc<E>,
) -> Result<Self, KeyExtractError> {
let inner = DbInner::new(config, executor).await?;
Ok(Self::from_inner(Arc::new(inner)))
}
#[cfg(test)]
pub(crate) async fn new_with_policy(
config: DynModeConfig,
executor: Arc<E>,
policy: Arc<dyn crate::inmem::policy::SealPolicy + Send + Sync>,
) -> Result<Self, KeyExtractError> {
let mut inner = DbInner::new(config, executor).await?;
inner.set_seal_policy(policy);
Ok(Self::from_inner(Arc::new(inner)))
}
}
type LockMap<K> = Arc<LockableHashMap<K, ()>>;
fn manifest_error_as_key_extract(err: ManifestError) -> KeyExtractError {
KeyExtractError::Arrow(ArrowError::ComputeError(format!("manifest error: {err}")))
}
#[cfg(test)]
#[doc(hidden)]
pub struct DbInner<FS, E>
where
FS: ManifestFs<E>,
E: Executor + Timer + Clone + 'static,
<FS as fusio::fs::Fs>::File: fusio::durability::FileCommit,
{
pub(crate) schema: SchemaRef,
pub(crate) delete_schema: SchemaRef,
pub(crate) commit_ack_mode: CommitAckMode,
mem: DynMem,
seal_state: Mutex<SealState>,
policy: Arc<dyn SealPolicy + Send + Sync>,
pub(crate) executor: Arc<E>,
fs: Arc<dyn DynFs>,
sst_root: Path,
wal: Option<WalHandle<E>>,
wal_config: Option<RuntimeWalConfig>,
table_meta: TableMeta,
commit_clock: CommitClock,
manifest: TonboManifest<FS, E>,
manifest_table: TableId,
bloom_cache: Arc<E::Mutex<BloomFilterCache>>,
pruner_cache: Arc<E::Mutex<PrunerCache>>,
metadata_cache: Arc<E::Mutex<ParquetMetadataCache>>,
snapshot_pins: Arc<SnapshotPinRegistry>,
mutable_wal_range: Arc<Mutex<Option<WalFrameRange>>>,
_key_locks: LockMap<KeyOwned>,
compaction_worker: Option<CompactionHandle<E>>,
compaction_metrics: Option<Arc<crate::compaction::metrics::CompactionMetrics>>,
flush_lock: AsyncMutex<()>,
minor_compaction: Option<MinorCompactionState>,
#[cfg(not(test))]
minor_compaction_pending: AtomicBool,
#[cfg(not(test))]
minor_compaction_rerun: AtomicBool,
l0_backpressure: Option<L0BackpressureConfig>,
l0_stats_cache: AsyncMutex<L0StatsCache>,
l0_stats_refreshing: AtomicBool,
cas_backoff: CasBackoffConfig,
}
#[cfg(not(test))]
pub(crate) struct DbInner<FS, E>
where
FS: ManifestFs<E>,
E: Executor + Timer + Clone + 'static,
<FS as fusio::fs::Fs>::File: fusio::durability::FileCommit,
{
pub(crate) schema: SchemaRef,
pub(crate) delete_schema: SchemaRef,
pub(crate) commit_ack_mode: CommitAckMode,
mem: DynMem,
seal_state: Mutex<SealState>,
policy: Arc<dyn SealPolicy + Send + Sync>,
pub(crate) executor: Arc<E>,
fs: Arc<dyn DynFs>,
sst_root: Path,
wal: Option<WalHandle<E>>,
wal_config: Option<RuntimeWalConfig>,
table_meta: TableMeta,
commit_clock: CommitClock,
manifest: TonboManifest<FS, E>,
manifest_table: TableId,
bloom_cache: Arc<E::Mutex<BloomFilterCache>>,
pruner_cache: Arc<E::Mutex<PrunerCache>>,
metadata_cache: Arc<E::Mutex<ParquetMetadataCache>>,
snapshot_pins: Arc<SnapshotPinRegistry>,
mutable_wal_range: Arc<Mutex<Option<WalFrameRange>>>,
_key_locks: LockMap<KeyOwned>,
compaction_worker: Option<CompactionHandle<E>>,
compaction_metrics: Option<Arc<crate::compaction::metrics::CompactionMetrics>>,
flush_lock: AsyncMutex<()>,
minor_compaction: Option<MinorCompactionState>,
#[cfg(not(test))]
minor_compaction_pending: AtomicBool,
#[cfg(not(test))]
minor_compaction_rerun: AtomicBool,
l0_backpressure: Option<L0BackpressureConfig>,
l0_stats_cache: AsyncMutex<L0StatsCache>,
l0_stats_refreshing: AtomicBool,
cas_backoff: CasBackoffConfig,
}
impl<FS, E> DbInner<FS, E>
where
FS: ManifestFs<E>,
E: Executor + Timer + Clone,
<FS as fusio::fs::Fs>::File: fusio::durability::FileCommit,
{
#[inline]
fn seal_state_lock(&self) -> MutexGuard<'_, SealState> {
self.seal_state.lock().expect("seal_state mutex poisoned")
}
pub(crate) fn extractor(&self) -> &Arc<dyn KeyProjection> {
self.mem.extractor()
}
pub(crate) fn delete_extractor(&self) -> &Arc<dyn KeyProjection> {
self.mem.delete_projection()
}
pub(crate) fn bloom_cache(&self) -> Arc<E::Mutex<BloomFilterCache>> {
Arc::clone(&self.bloom_cache)
}
pub(crate) fn pruner_cache(&self) -> Arc<E::Mutex<PrunerCache>> {
Arc::clone(&self.pruner_cache)
}
pub(crate) fn metadata_cache(&self) -> Arc<E::Mutex<ParquetMetadataCache>> {
Arc::clone(&self.metadata_cache)
}
#[allow(dead_code)]
pub(crate) fn active_snapshot_pins(&self) -> Vec<Timestamp> {
self.snapshot_pins.active_versions()
}
fn pin_snapshot_version(&self, manifest_ts: Timestamp) -> SnapshotPinGuard {
self.snapshot_pins.pin(manifest_ts)
}
fn snapshot_pin_for_manifest(
&self,
manifest_snapshot: &crate::manifest::TableSnapshot,
) -> Option<SnapshotPinGuard> {
manifest_snapshot
.latest_version
.as_ref()
.map(|version| self.pin_snapshot_version(version.commit_timestamp()))
}
#[cfg(all(test, feature = "tokio"))]
pub(crate) fn mem_read(&self) -> crate::inmem::mutable::memtable::TestMemRef<'_> {
crate::inmem::mutable::memtable::TestMemRef(&self.mem)
}
#[cfg(all(test, feature = "tokio"))]
pub(crate) fn set_mem_capacity(&mut self, capacity: usize) {
self.mem = DynMem::with_capacity(
self.schema.clone(),
Arc::clone(self.mem.extractor()),
Arc::clone(self.mem.delete_projection()),
capacity,
);
}
#[cfg(all(test, feature = "tokio"))]
pub(crate) fn seal_mutable(
&self,
) -> Option<crate::inmem::immutable::memtable::ImmutableMemTable> {
self.mem.seal_now().expect("seal should not fail")
}
#[allow(clippy::too_many_arguments)]
fn from_components(
schema: SchemaRef,
delete_schema: SchemaRef,
commit_ack_mode: CommitAckMode,
mem: DynMem,
fs: Arc<dyn DynFs>,
sst_root: Path,
manifest: TonboManifest<FS, E>,
manifest_table: TableId,
table_meta: TableMeta,
wal_config: Option<RuntimeWalConfig>,
executor: Arc<E>,
) -> Self {
let now = executor.now();
Self {
schema,
delete_schema,
commit_ack_mode,
mem,
seal_state: Mutex::new(SealState::default()),
policy: crate::inmem::policy::default_policy(),
executor,
fs,
sst_root,
wal: None,
wal_config,
table_meta,
commit_clock: CommitClock::default(),
manifest,
manifest_table,
bloom_cache: default_bloom_cache::<E>(),
pruner_cache: Arc::new(E::mutex(HashMap::new())),
metadata_cache: default_parquet_metadata_cache::<E>(),
snapshot_pins: Arc::new(SnapshotPinRegistry::default()),
mutable_wal_range: Arc::new(Mutex::new(None)),
_key_locks: Arc::new(LockableHashMap::new()),
compaction_worker: None,
compaction_metrics: None,
flush_lock: AsyncMutex::new(()),
minor_compaction: None,
#[cfg(not(test))]
minor_compaction_pending: AtomicBool::new(false),
#[cfg(not(test))]
minor_compaction_rerun: AtomicBool::new(false),
l0_backpressure: None,
l0_stats_cache: AsyncMutex::new(L0StatsCache::new(now)),
l0_stats_refreshing: AtomicBool::new(false),
cas_backoff: CasBackoffConfig::default(),
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn recover_with_wal_with_manifest(
config: DynModeConfig,
executor: Arc<E>,
fs: Arc<dyn DynFs>,
sst_root: Path,
wal_cfg: RuntimeWalConfig,
manifest: TonboManifest<FS, E>,
manifest_table: TableId,
table_meta: TableMeta,
) -> Result<Self, KeyExtractError> {
Self::recover_with_wal_inner(
config,
executor,
fs,
sst_root,
wal_cfg,
manifest,
manifest_table,
table_meta,
)
.await
}
#[allow(clippy::too_many_arguments)]
async fn recover_with_wal_inner(
config: DynModeConfig,
executor: Arc<E>,
fs: Arc<dyn DynFs>,
sst_root: Path,
wal_cfg: RuntimeWalConfig,
manifest: TonboManifest<FS, E>,
manifest_table: TableId,
table_meta: TableMeta,
) -> Result<Self, KeyExtractError> {
let state_commit_hint = if let Some(store) = wal_cfg.state_store.as_ref() {
WalStateHandle::load(Arc::clone(store), &wal_cfg.dir)
.await?
.state()
.commit_ts()
} else {
None
};
let (schema, delete_schema, commit_ack_mode, mem) = config.build()?;
let mut db = Self::from_components(
schema,
delete_schema,
commit_ack_mode,
mem,
fs,
sst_root,
manifest,
manifest_table,
table_meta,
Some(wal_cfg.clone()),
executor,
);
db.set_wal_config(Some(wal_cfg.clone()));
let wal_floor = db.manifest_wal_floor().await;
let replayer = Replayer::new(wal_cfg);
let events = replayer
.scan_with_floor(wal_floor.as_ref())
.await
.map_err(KeyExtractError::from)?;
if events.is_empty() {
db.set_mutable_wal_range(None);
} else if let Some(ref floor_ref) = wal_floor {
db.set_mutable_wal_range(Some(WalFrameRange {
first: floor_ref.first_frame(),
last: floor_ref.last_frame(),
}));
} else {
db.set_mutable_wal_range(Some(WalFrameRange {
first: INITIAL_FRAME_SEQ,
last: INITIAL_FRAME_SEQ,
}));
}
let last_commit_ts = db.replay_wal_events(events)?;
let effective_commit = last_commit_ts.or(state_commit_hint);
if let Some(ts) = effective_commit {
db.commit_clock.advance_to_at_least(ts.saturating_add(1));
}
Ok(db)
}
#[cfg_attr(not(test), allow(dead_code))]
pub async fn ingest(&self, batch: RecordBatch) -> Result<(), KeyExtractError> {
self.ingest_impl(batch, true).await
}
#[cfg(not(test))]
pub(crate) async fn ingest_without_minor_compaction(
&self,
batch: RecordBatch,
) -> Result<(), KeyExtractError> {
self.ingest_impl(batch, false).await
}
async fn ingest_impl(
&self,
batch: RecordBatch,
run_minor_compaction: bool,
) -> Result<(), KeyExtractError> {
if self.schema.as_ref() != batch.schema().as_ref() {
return Err(KeyExtractError::SchemaMismatch {
expected: self.schema.clone(),
actual: batch.schema(),
});
}
self.apply_l0_backpressure().await?;
let commit_ts = self.next_commit_ts();
let mut wal_spans: Vec<(u64, u64)> = Vec::new();
if let Some(handle) = self.wal_handle().cloned() {
let provisional_id = handle.next_provisional_id();
let append_ticket = handle
.txn_append(provisional_id, &batch, commit_ts)
.await
.map_err(KeyExtractError::from)?;
let commit_ticket = handle
.txn_commit(provisional_id, commit_ts)
.await
.map_err(KeyExtractError::from)?;
for ticket in [append_ticket, commit_ticket] {
let ack = ticket.durable().await.map_err(KeyExtractError::from)?;
wal_spans.push((ack.first_seq, ack.last_seq));
}
}
for (first, last) in wal_spans {
self.observe_mutable_wal_span(first, last);
}
self.insert_into_mutable(batch, commit_ts)?;
self.maybe_seal_after_insert()?;
if run_minor_compaction {
self.maybe_run_minor_compaction().await.map_err(|err| {
KeyExtractError::Arrow(ArrowError::ComputeError(format!(
"minor compaction failed: {err}"
)))
})?;
}
Ok(())
}
pub(crate) fn executor(&self) -> &Arc<E> {
&self.executor
}
#[cfg(test)]
pub fn table_id(&self) -> TableId {
self.manifest_table
}
pub async fn begin_snapshot(&self) -> Result<TxSnapshot, SnapshotError> {
let manifest_snapshot = self
.manifest
.snapshot_latest_with_fallback(self.manifest_table, &self.table_meta)
.await?;
let manifest_pin = self.snapshot_pin_for_manifest(&manifest_snapshot);
let next_ts = self.commit_clock.peek();
let read_ts = next_ts.saturating_sub(1);
let read_view = ReadView::new(read_ts);
Ok(TxSnapshot::from_table_snapshot(
read_view,
manifest_snapshot,
manifest_pin,
))
}
pub async fn snapshot_at(&self, timestamp: Timestamp) -> Result<TxSnapshot, SnapshotError> {
let versions = self
.manifest
.list_versions(self.manifest_table, 0) .await?;
let target_version = versions.iter().find(|v| v.commit_timestamp <= timestamp);
let (manifest_snapshot, manifest_pin) = if let Some(version) = target_version {
let manifest_pin = Some(self.pin_snapshot_version(version.commit_timestamp));
let manifest_snapshot = self
.manifest
.snapshot_at_version(self.manifest_table, version.commit_timestamp)
.await?;
debug_assert_eq!(
manifest_snapshot
.latest_version
.as_ref()
.map(|snapshot_version| snapshot_version.commit_timestamp()),
Some(version.commit_timestamp)
);
(manifest_snapshot, manifest_pin)
} else if versions.is_empty() {
let manifest_snapshot = self
.manifest
.snapshot_latest_with_fallback(self.manifest_table, &self.table_meta)
.await?;
let manifest_pin = self.snapshot_pin_for_manifest(&manifest_snapshot);
(manifest_snapshot, manifest_pin)
} else {
return Err(ManifestError::VersionUnavailable {
requested: timestamp,
oldest_available: versions
.last()
.map(|version| version.commit_timestamp)
.ok_or(ManifestError::Invariant(
"committed manifest versions unexpectedly missing",
))?,
}
.into());
};
let read_view = ReadView::new(timestamp);
Ok(TxSnapshot::from_table_snapshot(
read_view,
manifest_snapshot,
manifest_pin,
))
}
pub async fn list_versions(&self, limit: usize) -> Result<Vec<Version>, ManifestError> {
let mut states = self
.manifest
.list_versions(self.manifest_table, limit)
.await?;
if states.is_empty() {
let snapshot = self
.manifest
.snapshot_latest_with_fallback(self.manifest_table, &self.table_meta)
.await?;
if let Some(latest) = snapshot.latest_version {
states.push(latest);
}
}
Ok(states.into_iter().map(version_from_state).collect())
}
pub(crate) fn next_commit_ts(&self) -> Timestamp {
self.commit_clock.alloc()
}
pub(crate) fn num_immutable_segments(&self) -> usize {
self.seal_state_lock().immutables.len()
}
async fn l0_stats_from_manifest(&self) -> Result<L0Stats, ManifestError> {
let snapshot = self
.manifest
.snapshot_latest_with_fallback(self.manifest_table, &self.table_meta)
.await?;
let Some(version) = snapshot.latest_version else {
return Ok(L0Stats {
file_count: 0,
total_bytes: Some(0),
});
};
let Some(level0) = version.ssts().first() else {
return Ok(L0Stats {
file_count: 0,
total_bytes: Some(0),
});
};
let mut total_bytes = Some(0usize);
for entry in level0 {
let Some(stats) = entry.stats() else {
total_bytes = None;
break;
};
total_bytes = total_bytes.map(|sum| sum.saturating_add(stats.bytes));
}
Ok(L0Stats {
file_count: level0.len(),
total_bytes,
})
}
async fn l0_stats(
&self,
config: &L0BackpressureConfig,
force_refresh: bool,
) -> Result<L0Stats, ManifestError> {
let now = self.executor.now();
let refresh_interval = config.stop_delay_value().max(Duration::from_millis(1));
{
let cache = self.l0_stats_cache.lock().await;
if !force_refresh && cache.is_fresh(now, refresh_interval) {
return Ok(cache.snapshot());
}
if self.l0_stats_refreshing.load(Ordering::Acquire) {
return Ok(cache.snapshot());
}
}
if self
.l0_stats_refreshing
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
let cache = self.l0_stats_cache.lock().await;
return Ok(cache.snapshot());
}
let _refresh_guard = L0StatsRefreshGuard::new(&self.l0_stats_refreshing);
let stats_result = self.l0_stats_from_manifest().await;
let mut cache = self.l0_stats_cache.lock().await;
match stats_result {
Ok(stats) => {
cache.update(now, stats);
Ok(stats)
}
Err(err) => Err(err),
}
}
async fn l0_backpressure_decision(
&self,
config: &L0BackpressureConfig,
force_refresh: bool,
) -> Result<BackpressureDecision, ManifestError> {
let stats = self.l0_stats(config, force_refresh).await?;
Ok(config.decision(stats))
}
async fn apply_l0_backpressure(&self) -> Result<(), KeyExtractError> {
let Some(config) = &self.l0_backpressure else {
return Ok(());
};
if self.compaction_worker.is_none() {
return Ok(());
}
let mut decision = self
.l0_backpressure_decision(config, false)
.await
.map_err(manifest_error_as_key_extract)?;
match decision {
BackpressureDecision::Proceed => {}
BackpressureDecision::Slowdown(delay) => {
self.kick_compaction_worker();
if let Some(metrics) = self.compaction_metrics.as_ref() {
metrics.record_backpressure(CompactionBackpressureSignal::Slowdown, delay);
}
self.executor.sleep(delay).await;
}
BackpressureDecision::Stall(delay) => {
self.kick_compaction_worker();
loop {
if let Some(metrics) = self.compaction_metrics.as_ref() {
metrics.record_backpressure(CompactionBackpressureSignal::Stall, delay);
}
self.executor.sleep(delay).await;
decision = self
.l0_backpressure_decision(config, true)
.await
.map_err(manifest_error_as_key_extract)?;
match decision {
BackpressureDecision::Stall(_) => continue,
BackpressureDecision::Slowdown(next_delay) => {
if let Some(metrics) = self.compaction_metrics.as_ref() {
metrics.record_backpressure(
CompactionBackpressureSignal::Slowdown,
next_delay,
);
}
self.executor.sleep(next_delay).await;
break;
}
BackpressureDecision::Proceed => break,
}
}
}
}
Ok(())
}
pub(crate) async fn maybe_run_minor_compaction(&self) -> Result<Option<SsTable>, SsTableError> {
if let Some(config) = &self.l0_backpressure
&& self.compaction_worker.is_some()
{
match self
.l0_backpressure_decision(config, false)
.await
.map_err(SsTableError::Manifest)?
{
BackpressureDecision::Stall(_) => {
self.kick_compaction_worker();
return Ok(None);
}
BackpressureDecision::Slowdown(_) | BackpressureDecision::Proceed => {}
}
}
if let Some(compaction) = &self.minor_compaction {
compaction.maybe_compact(self).await
} else {
Ok(None)
}
}
#[cfg(not(test))]
pub(crate) fn schedule_background_minor_compaction(db: Arc<Self>)
where
E: 'static,
{
if db.minor_compaction.is_none() {
return;
}
if db
.minor_compaction_pending
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
db.minor_compaction_rerun.store(true, Ordering::Release);
return;
}
let runtime = Arc::clone(&db.executor);
let db_for_task = Arc::clone(&db);
runtime.spawn(async move {
let result = db_for_task.maybe_run_minor_compaction().await;
let rerun = db_for_task
.minor_compaction_rerun
.swap(false, Ordering::AcqRel);
db_for_task
.minor_compaction_pending
.store(false, Ordering::Release);
if let Err(err) = result {
log_warn!(
component = "compaction",
event = "minor_compaction_background_failed",
error = ?err,
);
}
if rerun {
Self::schedule_background_minor_compaction(db_for_task);
}
});
}
pub(crate) async fn flush_immutables_with_descriptor(
&self,
config: Arc<SsTableConfig>,
descriptor: SsTableDescriptor,
) -> Result<SsTable, SsTableError> {
let _guard = self.flush_lock.lock().await;
let (immutables_snapshot, flush_count) = {
let seal_read = self.seal_state_lock();
if seal_read.immutables.is_empty() {
return Err(SsTableError::NoImmutableSegments);
}
(seal_read.immutables.clone(), seal_read.immutables.len())
};
let mut builder = SsTableBuilder::new(config, descriptor);
for seg in &immutables_snapshot {
builder.add_immutable(seg)?;
}
let existing_floor = self.manifest_wal_floor().await;
let live_floor = self.mutable_wal_range_snapshot().map(|range| range.first);
let (wal_ids, wal_refs) = if let Some(cfg) = &self.wal_config {
match manifest_ext::collect_wal_segment_refs(cfg, existing_floor.as_ref(), live_floor)
.await
{
Ok(refs) => {
let wal_ids = if refs.is_empty() {
builder.set_wal_ids(None);
None
} else {
let ids: Vec<FileId> = refs.iter().map(|ref_| *ref_.file_id()).collect();
builder.set_wal_ids(Some(ids.clone()));
Some(ids)
};
(wal_ids, Some(refs))
}
Err(_err) => {
return Err(SsTableError::Manifest(ManifestError::Invariant(
"failed to enumerate wal segments",
)));
}
}
} else {
builder.set_wal_ids(None);
(None, None)
};
let executor: E = (*self.executor).clone();
match builder.finish(executor).await {
Ok(table) => {
let descriptor_ref = table.descriptor();
let data_path = descriptor_ref.data_path().cloned().ok_or_else(|| {
SsTableError::Manifest(ManifestError::Invariant(
"sst descriptor missing data path",
))
})?;
let data_path = manifest_storage_path(&self.sst_root, &data_path);
let delete_path = descriptor_ref
.delete_path()
.cloned()
.map(|path| manifest_storage_path(&self.sst_root, &path));
let stats = descriptor_ref.stats().cloned();
let sst_entry = SstEntry::new(
descriptor_ref.id().clone(),
stats,
wal_ids.clone(),
data_path,
delete_path,
);
let mut edits = vec![VersionEdit::AddSsts {
level: descriptor_ref.level() as u32,
entries: vec![sst_entry],
}];
if let Some(stats) = descriptor_ref.stats()
&& let Some(max_commit) = stats.max_commit_ts
{
edits.push(VersionEdit::SetTombstoneWatermark {
watermark: max_commit.get(),
});
}
if let Some(refs) = wal_refs {
edits.push(VersionEdit::SetWalSegments { segments: refs });
}
self.manifest
.apply_version_edits(self.manifest_table, &edits)
.await?;
self.prune_wal_segments_below_floor().await;
let mut seal = self.seal_state_lock();
let actual_len = seal.immutables.len();
if actual_len >= flush_count {
seal.immutables.drain(0..flush_count);
} else {
seal.immutables.clear();
}
let wal_ranges_len = seal.immutable_wal_ranges.len();
if wal_ranges_len >= flush_count {
seal.immutable_wal_ranges.drain(0..flush_count);
} else {
seal.immutable_wal_ranges.clear();
}
seal.last_seal_at = Some(self.executor.now());
self.kick_compaction_worker();
Ok(table)
}
Err(err) => Err(err),
}
}
async fn manifest_wal_floor(&self) -> Option<WalSegmentRef> {
self.manifest
.wal_floor(self.manifest_table)
.await
.ok()
.flatten()
}
pub(crate) fn set_seal_policy(&mut self, policy: Arc<dyn SealPolicy + Send + Sync>) {
self.policy = policy;
}
pub(crate) fn key_locks(&self) -> &LockMap<KeyOwned> {
&self._key_locks
}
}
#[cfg(test)]
impl<E> DbInner<InMemoryFs, E>
where
E: Executor + Timer + Clone + 'static,
{
pub(crate) async fn new(
config: DynModeConfig,
executor: Arc<E>,
) -> Result<Self, KeyExtractError> {
use crate::{
id::FileIdGenerator, manifest::init_in_memory_manifest, mode::table_definition,
};
let table_definition = table_definition(&config, builder::DEFAULT_TABLE_NAME);
let (schema, delete_schema, commit_ack_mode, mem) = config.build()?;
let file_ids = FileIdGenerator::default();
let manifest = init_in_memory_manifest((*executor).clone())
.await
.map_err(manifest_error_as_key_extract)?;
let table_meta = manifest
.register_table(&file_ids, &table_definition)
.await
.map_err(manifest_error_as_key_extract)?;
let manifest_table = table_meta.table_id;
let fs: Arc<dyn DynFs> = Arc::new(InMemoryFs::new());
Ok(Self::from_components(
schema,
delete_schema,
commit_ack_mode,
mem,
fs,
Path::default(),
manifest,
manifest_table,
table_meta,
None,
executor,
))
}
}