1use std::collections::hash_map;
2use std::path::Path;
3use std::sync::{Arc, Mutex};
4use std::time::Duration;
5
6use anyhow::{Context, Result};
7use arc_swap::ArcSwap;
8use bytesize::ByteSize;
9use tokio::sync::Notify;
10use tokio::task::AbortHandle;
11use tycho_util::FastHashMap;
12use tycho_util::metrics::{FsUsageBuilder, FsUsageMonitor, spawn_metrics_loop};
13use weedb::{WeakWeeDbRaw, rocksdb};
14
15use crate::config::StorageConfig;
16use crate::fs::{Dir, TempFileStorage};
17use crate::kv::{NamedTables, TableContext, WeeDbExt};
18
19const FILES_SUBDIR: &str = "files";
20
21#[derive(Clone)]
22pub struct StorageContext {
23 inner: Arc<StorageContextInner>,
24}
25
26impl StorageContext {
27 #[cfg(any(test, feature = "test"))]
32 pub async fn new_temp() -> Result<(Self, tempfile::TempDir)> {
33 let tmp_dir = tempfile::tempdir()?;
34 let config = StorageConfig::new_potato(tmp_dir.path());
35 let ctx = StorageContext::new(config).await?;
36 Ok((ctx, tmp_dir))
37 }
38
39 pub async fn new(config: StorageConfig) -> Result<Self> {
40 let root_dir = Dir::new(&config.root_dir)?;
41 let files_dir = root_dir.create_subdir(FILES_SUBDIR)?;
42
43 let temp_files =
44 TempFileStorage::new(&files_dir).context("failed to create temp files storage")?;
45 temp_files.remove_outdated_files().await?;
46
47 let mut fs_usage = FsUsageBuilder::new()
48 .add_path(files_dir.path())
49 .add_path(temp_files.dir().path())
50 .build();
51
52 fs_usage.spawn_metrics_loop(Duration::from_secs(60))?;
53
54 let threads = std::thread::available_parallelism()?.get();
55 let fdlimit = match fdlimit::raise_fd_limit() {
56 Ok(fdlimit::Outcome::LimitRaised { to, .. }) => to,
58 _ => match rlimit::getrlimit(rlimit::Resource::NOFILE) {
60 Ok((limit, _)) => limit,
61 Err(_) => 256,
62 },
63 };
64
65 let mut rocksdb_env =
66 rocksdb::Env::new().context("failed to create a new RocksDB environemnt")?;
67 let thread_pool_size = std::cmp::max(threads as i32 / 2, 2);
68 rocksdb_env.set_background_threads(thread_pool_size);
69 rocksdb_env.set_low_priority_background_threads(thread_pool_size);
70 rocksdb_env.set_high_priority_background_threads(thread_pool_size);
71
72 let rocksdb_instances: Arc<ArcSwap<KnownInstances>> = Default::default();
73 let rocksdb_metrics_handle = if config.rocksdb_enable_metrics {
74 Some(spawn_metrics_loop(
75 &rocksdb_instances,
76 Duration::from_secs(5),
77 async move |this| {
78 let this = this.load_full();
79 for item in this.values() {
80 if let Some(db) = item.weak.upgrade() {
81 db.refresh_metrics();
82 };
83 }
84 },
85 ))
86 } else {
87 None
88 };
89
90 tracing::info!(
91 threads,
92 fdlimit,
93 thread_pool_size,
94 root_dir = %config.root_dir.display(),
95 "storage context created",
96 );
97
98 Ok(Self {
99 inner: Arc::new(StorageContextInner {
100 config,
101 root_dir,
102 files_dir,
103 temp_files,
104 fs_usage,
105 threads,
106 fdlimit,
107 rocksdb_table_context: Default::default(),
108 rocksdb_env,
109 rocksdb_instances_lock: Default::default(),
110 rocksdb_instances,
111 rocksdb_metrics_handle,
112 }),
113 })
114 }
115
116 pub fn config(&self) -> &StorageConfig {
117 &self.inner.config
118 }
119
120 pub fn root_dir(&self) -> &Dir {
121 &self.inner.root_dir
122 }
123
124 pub fn files_dir(&self) -> &Dir {
125 &self.inner.files_dir
126 }
127
128 pub fn temp_files(&self) -> &TempFileStorage {
129 &self.inner.temp_files
130 }
131
132 pub fn threads(&self) -> usize {
133 self.inner.threads
134 }
135
136 pub fn fdlimit(&self) -> u64 {
137 self.inner.fdlimit
138 }
139
140 pub fn fs_usage(&self) -> &FsUsageMonitor {
141 &self.inner.fs_usage
142 }
143
144 pub fn rocksdb_table_context(&self) -> &TableContext {
145 &self.inner.rocksdb_table_context
146 }
147
148 pub fn rocksdb_env(&self) -> &rocksdb::Env {
149 &self.inner.rocksdb_env
150 }
151
152 pub fn trigger_rocksdb_compaction(&self, name: &str) -> bool {
153 let Some(known) = self.inner.rocksdb_instances.load().get(name).cloned() else {
154 return false;
155 };
156 known.compaction_events.notify_waiters();
157 true
158 }
159
160 pub fn add_rocksdb_instance(&self, name: &str, db: &weedb::WeeDbRaw) -> bool {
161 let _guard = self.inner.rocksdb_instances_lock.lock().unwrap();
162
163 let mut items = Arc::unwrap_or_clone(self.inner.rocksdb_instances.load_full());
164
165 let db = weedb::WeeDbRaw::downgrade(db);
167 items.retain(|_, item| !weedb::WeakWeeDbRaw::ptr_eq(&db, &item.weak));
168
169 let updated = match items.entry(name.to_owned()) {
171 hash_map::Entry::Vacant(entry) => {
172 entry.insert(Arc::new(KnownInstance::new(name, db)));
173 true
174 }
175 hash_map::Entry::Occupied(mut entry) => {
176 if !weedb::WeakWeeDbRaw::ptr_eq(&db, &entry.get().weak) {
177 *entry.get_mut() = Arc::new(KnownInstance::new(name, db));
178 true
179 } else {
180 false
181 }
182 }
183 };
184
185 self.inner.rocksdb_instances.store(Arc::new(items));
187 updated
188 }
189
190 pub fn validate_options<T>(&self) -> Result<()>
191 where
192 T: NamedTables<Context = TableContext> + 'static,
193 {
194 let this = self.inner.as_ref();
195 weedb::WeeDbRaw::builder("nonexisting", this.rocksdb_table_context.clone())
197 .with_options(|opts, _| self.apply_default_options(opts))
198 .with_tables::<T>();
199 Ok(())
200 }
201
202 pub fn open_preconfigured<P, T>(&self, subdir: P) -> Result<weedb::WeeDb<T>>
203 where
204 P: AsRef<Path>,
205 T: NamedTables<Context = TableContext> + 'static,
206 {
207 let subdir = subdir.as_ref();
208 tracing::debug!(subdir = %subdir.display(), "opening RocksDB instance");
209
210 let this = self.inner.as_ref();
211
212 let db_dir = this.root_dir.create_subdir(subdir)?;
213 this.fs_usage.add_path(db_dir.path());
214 let db =
215 weedb::WeeDb::<T>::builder_prepared(db_dir.path(), this.rocksdb_table_context.clone())
216 .with_metrics_enabled(this.config.rocksdb_enable_metrics)
217 .with_options(|opts, _| self.apply_default_options(opts))
218 .build()?;
219
220 if let Some(name) = db.db_name() {
221 self.add_rocksdb_instance(name, db.raw());
222 }
223
224 tracing::debug!(current_rocksdb_buffer_usage = ?self.rocksdb_table_context().buffer_usage());
225
226 Ok(db)
227 }
228
229 pub fn apply_default_options(&self, opts: &mut rocksdb::Options) {
230 let this = self.inner.as_ref();
231
232 opts.set_paranoid_checks(false);
233
234 opts.set_max_subcompactions(this.threads as u32 / 2);
236
237 opts.set_max_open_files(this.fdlimit as i32);
239
240 opts.set_log_level(rocksdb::LogLevel::Info);
242 opts.set_keep_log_file_num(2);
243 opts.set_recycle_log_file_num(2);
244 opts.set_max_log_file_size(ByteSize::gib(1).as_u64() as usize);
245
246 opts.create_if_missing(true);
248 opts.create_missing_column_families(true);
249
250 #[allow(deprecated)]
255 opts.set_max_background_flushes(this.threads as i32 / 2);
256 #[allow(deprecated)]
257 opts.set_max_background_compactions(this.threads as i32 / 2);
258
259 opts.set_env(&this.rocksdb_env);
260
261 opts.set_allow_concurrent_memtable_write(false);
262
263 }
270}
271
272struct StorageContextInner {
273 config: StorageConfig,
274 root_dir: Dir,
275 files_dir: Dir,
276 temp_files: TempFileStorage,
277 fs_usage: FsUsageMonitor,
278 threads: usize,
279 fdlimit: u64,
280 rocksdb_table_context: TableContext,
281 rocksdb_env: rocksdb::Env,
282 rocksdb_instances_lock: Mutex<()>,
283 rocksdb_instances: Arc<ArcSwap<KnownInstances>>,
284 rocksdb_metrics_handle: Option<AbortHandle>,
285}
286
287impl Drop for StorageContextInner {
288 fn drop(&mut self) {
289 if let Some(handle) = &self.rocksdb_metrics_handle {
290 handle.abort();
291 }
292 }
293}
294
295type KnownInstances = FastHashMap<String, Arc<KnownInstance>>;
296
297struct KnownInstance {
298 weak: WeakWeeDbRaw,
299 compaction_events: Arc<Notify>,
300 task_handle: AbortHandle,
301}
302
303impl KnownInstance {
304 fn new(name: &str, weak: WeakWeeDbRaw) -> Self {
305 let name = name.to_owned();
306 let compaction_events = Arc::new(Notify::new());
307 let task_handle = tokio::task::spawn({
308 let weak = weak.clone();
309 let compaction_events = compaction_events.clone();
310 async move {
311 tracing::debug!(name, "compaction trigger listener started");
312 scopeguard::defer! {
313 tracing::debug!(name, "compaction trigger listener stopped");
314 }
315
316 let mut notified = compaction_events.notified();
317 loop {
318 notified.await;
319 notified = compaction_events.notified();
320 let Some(db) = weak.upgrade() else {
321 break;
322 };
323
324 db.compact();
325 }
326 }
327 })
328 .abort_handle();
329
330 Self {
331 weak,
332 compaction_events,
333 task_handle,
334 }
335 }
336}
337
338impl Drop for KnownInstance {
339 fn drop(&mut self) {
340 self.task_handle.abort();
341 }
342}