use std::collections::hash_map;
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use anyhow::{Context, Result};
use arc_swap::ArcSwap;
use bytesize::ByteSize;
use tokio::sync::Notify;
use tokio::task::AbortHandle;
use tycho_util::FastHashMap;
use tycho_util::metrics::{FsUsageBuilder, FsUsageMonitor, spawn_metrics_loop};
use weedb::{WeakWeeDbRaw, rocksdb};
use crate::config::StorageConfig;
use crate::fs::{Dir, TempFileStorage};
use crate::kv::{NamedTables, TableContext, WeeDbExt};
const FILES_SUBDIR: &str = "files";
#[derive(Clone)]
pub struct StorageContext {
inner: Arc<StorageContextInner>,
}
impl StorageContext {
#[cfg(any(test, feature = "test"))]
pub async fn new_temp() -> Result<(Self, tempfile::TempDir)> {
let tmp_dir = tempfile::tempdir()?;
let config = StorageConfig::new_potato(tmp_dir.path());
let ctx = StorageContext::new(config).await?;
Ok((ctx, tmp_dir))
}
pub async fn new(config: StorageConfig) -> Result<Self> {
let root_dir = Dir::new(&config.root_dir)?;
let files_dir = root_dir.create_subdir(FILES_SUBDIR)?;
let temp_files =
TempFileStorage::new(&files_dir).context("failed to create temp files storage")?;
temp_files.remove_outdated_files().await?;
let mut fs_usage = FsUsageBuilder::new()
.add_path(files_dir.path())
.add_path(temp_files.dir().path())
.build();
fs_usage.spawn_metrics_loop(Duration::from_secs(60))?;
let threads = std::thread::available_parallelism()?.get();
let fdlimit = match fdlimit::raise_fd_limit() {
Ok(fdlimit::Outcome::LimitRaised { to, .. }) => to,
_ => match rlimit::getrlimit(rlimit::Resource::NOFILE) {
Ok((limit, _)) => limit,
Err(_) => 256,
},
};
let mut rocksdb_env =
rocksdb::Env::new().context("failed to create a new RocksDB environemnt")?;
let thread_pool_size = std::cmp::max(threads as i32 / 2, 2);
rocksdb_env.set_background_threads(thread_pool_size);
rocksdb_env.set_low_priority_background_threads(thread_pool_size);
rocksdb_env.set_high_priority_background_threads(thread_pool_size);
let rocksdb_instances: Arc<ArcSwap<KnownInstances>> = Default::default();
let rocksdb_metrics_handle = if config.rocksdb_enable_metrics {
Some(spawn_metrics_loop(
&rocksdb_instances,
Duration::from_secs(5),
async move |this| {
let this = this.load_full();
for item in this.values() {
if let Some(db) = item.weak.upgrade() {
db.refresh_metrics();
};
}
},
))
} else {
None
};
tracing::info!(
threads,
fdlimit,
thread_pool_size,
root_dir = %config.root_dir.display(),
"storage context created",
);
Ok(Self {
inner: Arc::new(StorageContextInner {
config,
root_dir,
files_dir,
temp_files,
fs_usage,
threads,
fdlimit,
rocksdb_table_context: Default::default(),
rocksdb_env,
rocksdb_instances_lock: Default::default(),
rocksdb_instances,
rocksdb_metrics_handle,
}),
})
}
pub fn config(&self) -> &StorageConfig {
&self.inner.config
}
pub fn root_dir(&self) -> &Dir {
&self.inner.root_dir
}
pub fn files_dir(&self) -> &Dir {
&self.inner.files_dir
}
pub fn temp_files(&self) -> &TempFileStorage {
&self.inner.temp_files
}
pub fn threads(&self) -> usize {
self.inner.threads
}
pub fn fdlimit(&self) -> u64 {
self.inner.fdlimit
}
pub fn fs_usage(&self) -> &FsUsageMonitor {
&self.inner.fs_usage
}
pub fn rocksdb_table_context(&self) -> &TableContext {
&self.inner.rocksdb_table_context
}
pub fn rocksdb_env(&self) -> &rocksdb::Env {
&self.inner.rocksdb_env
}
pub fn trigger_rocksdb_compaction(&self, name: &str) -> bool {
let Some(known) = self.inner.rocksdb_instances.load().get(name).cloned() else {
return false;
};
known.compaction_events.notify_waiters();
true
}
pub fn add_rocksdb_instance(&self, name: &str, db: &weedb::WeeDbRaw) -> bool {
let _guard = self.inner.rocksdb_instances_lock.lock().unwrap();
let mut items = Arc::unwrap_or_clone(self.inner.rocksdb_instances.load_full());
let db = weedb::WeeDbRaw::downgrade(db);
items.retain(|_, item| !weedb::WeakWeeDbRaw::ptr_eq(&db, &item.weak));
let updated = match items.entry(name.to_owned()) {
hash_map::Entry::Vacant(entry) => {
entry.insert(Arc::new(KnownInstance::new(name, db)));
true
}
hash_map::Entry::Occupied(mut entry) => {
if !weedb::WeakWeeDbRaw::ptr_eq(&db, &entry.get().weak) {
*entry.get_mut() = Arc::new(KnownInstance::new(name, db));
true
} else {
false
}
}
};
self.inner.rocksdb_instances.store(Arc::new(items));
updated
}
pub fn validate_options<T>(&self) -> Result<()>
where
T: NamedTables<Context = TableContext> + 'static,
{
let this = self.inner.as_ref();
weedb::WeeDbRaw::builder("nonexisting", this.rocksdb_table_context.clone())
.with_options(|opts, _| self.apply_default_options(opts))
.with_tables::<T>();
Ok(())
}
pub fn open_preconfigured<P, T>(&self, subdir: P) -> Result<weedb::WeeDb<T>>
where
P: AsRef<Path>,
T: NamedTables<Context = TableContext> + 'static,
{
let subdir = subdir.as_ref();
tracing::debug!(subdir = %subdir.display(), "opening RocksDB instance");
let this = self.inner.as_ref();
let db_dir = this.root_dir.create_subdir(subdir)?;
this.fs_usage.add_path(db_dir.path());
let db =
weedb::WeeDb::<T>::builder_prepared(db_dir.path(), this.rocksdb_table_context.clone())
.with_metrics_enabled(this.config.rocksdb_enable_metrics)
.with_options(|opts, _| self.apply_default_options(opts))
.build()?;
if let Some(name) = db.db_name() {
self.add_rocksdb_instance(name, db.raw());
}
tracing::debug!(current_rocksdb_buffer_usage = ?self.rocksdb_table_context().buffer_usage());
Ok(db)
}
pub fn apply_default_options(&self, opts: &mut rocksdb::Options) {
let this = self.inner.as_ref();
opts.set_paranoid_checks(false);
opts.set_max_subcompactions(this.threads as u32 / 2);
opts.set_max_open_files(this.fdlimit as i32);
opts.set_log_level(rocksdb::LogLevel::Info);
opts.set_keep_log_file_num(2);
opts.set_recycle_log_file_num(2);
opts.set_max_log_file_size(ByteSize::gib(1).as_u64() as usize);
opts.create_if_missing(true);
opts.create_missing_column_families(true);
#[allow(deprecated)]
opts.set_max_background_flushes(this.threads as i32 / 2);
#[allow(deprecated)]
opts.set_max_background_compactions(this.threads as i32 / 2);
opts.set_env(&this.rocksdb_env);
opts.set_allow_concurrent_memtable_write(false);
}
}
struct StorageContextInner {
config: StorageConfig,
root_dir: Dir,
files_dir: Dir,
temp_files: TempFileStorage,
fs_usage: FsUsageMonitor,
threads: usize,
fdlimit: u64,
rocksdb_table_context: TableContext,
rocksdb_env: rocksdb::Env,
rocksdb_instances_lock: Mutex<()>,
rocksdb_instances: Arc<ArcSwap<KnownInstances>>,
rocksdb_metrics_handle: Option<AbortHandle>,
}
impl Drop for StorageContextInner {
fn drop(&mut self) {
if let Some(handle) = &self.rocksdb_metrics_handle {
handle.abort();
}
}
}
type KnownInstances = FastHashMap<String, Arc<KnownInstance>>;
struct KnownInstance {
weak: WeakWeeDbRaw,
compaction_events: Arc<Notify>,
task_handle: AbortHandle,
}
impl KnownInstance {
fn new(name: &str, weak: WeakWeeDbRaw) -> Self {
let name = name.to_owned();
let compaction_events = Arc::new(Notify::new());
let task_handle = tokio::task::spawn({
let weak = weak.clone();
let compaction_events = compaction_events.clone();
async move {
tracing::debug!(name, "compaction trigger listener started");
scopeguard::defer! {
tracing::debug!(name, "compaction trigger listener stopped");
}
let mut notified = compaction_events.notified();
loop {
notified.await;
notified = compaction_events.notified();
let Some(db) = weak.upgrade() else {
break;
};
db.compact();
}
}
})
.abort_handle();
Self {
weak,
compaction_events,
task_handle,
}
}
}
impl Drop for KnownInstance {
fn drop(&mut self) {
self.task_handle.abort();
}
}