Skip to main content

tycho_storage/
context.rs

1use std::collections::hash_map;
2use std::path::Path;
3use std::sync::{Arc, Mutex};
4use std::time::Duration;
5
6use anyhow::{Context, Result};
7use arc_swap::ArcSwap;
8use bytesize::ByteSize;
9use tokio::sync::Notify;
10use tokio::task::AbortHandle;
11use tycho_util::FastHashMap;
12use tycho_util::metrics::{FsUsageBuilder, FsUsageMonitor, spawn_metrics_loop};
13use weedb::{WeakWeeDbRaw, rocksdb};
14
15use crate::config::StorageConfig;
16use crate::fs::{Dir, TempFileStorage};
17use crate::kv::{NamedTables, TableContext, WeeDbExt};
18
19const FILES_SUBDIR: &str = "files";
20
21#[derive(Clone)]
22pub struct StorageContext {
23    inner: Arc<StorageContextInner>,
24}
25
26impl StorageContext {
27    /// Creates a new temporary storage context with potato config.
28    ///
29    /// NOTE: Temp dir must live longer than the storage,
30    /// otherwise compaction filter will not work.
31    #[cfg(any(test, feature = "test"))]
32    pub async fn new_temp() -> Result<(Self, tempfile::TempDir)> {
33        let tmp_dir = tempfile::tempdir()?;
34        let config = StorageConfig::new_potato(tmp_dir.path());
35        let ctx = StorageContext::new(config).await?;
36        Ok((ctx, tmp_dir))
37    }
38
39    pub async fn new(config: StorageConfig) -> Result<Self> {
40        let root_dir = Dir::new(&config.root_dir)?;
41        let files_dir = root_dir.create_subdir(FILES_SUBDIR)?;
42
43        let temp_files =
44            TempFileStorage::new(&files_dir).context("failed to create temp files storage")?;
45        temp_files.remove_outdated_files().await?;
46
47        let mut fs_usage = FsUsageBuilder::new()
48            .add_path(files_dir.path())
49            .add_path(temp_files.dir().path())
50            .build();
51
52        fs_usage.spawn_metrics_loop(Duration::from_secs(60))?;
53
54        let threads = std::thread::available_parallelism()?.get();
55        let fdlimit = match fdlimit::raise_fd_limit() {
56            // New fd limit
57            Ok(fdlimit::Outcome::LimitRaised { to, .. }) => to,
58            // Current soft limit
59            _ => match rlimit::getrlimit(rlimit::Resource::NOFILE) {
60                Ok((limit, _)) => limit,
61                Err(_) => 256,
62            },
63        };
64
65        let mut rocksdb_env =
66            rocksdb::Env::new().context("failed to create a new RocksDB environemnt")?;
67        let thread_pool_size = std::cmp::max(threads as i32 / 2, 2);
68        rocksdb_env.set_background_threads(thread_pool_size);
69        rocksdb_env.set_low_priority_background_threads(thread_pool_size);
70        rocksdb_env.set_high_priority_background_threads(thread_pool_size);
71
72        let rocksdb_instances: Arc<ArcSwap<KnownInstances>> = Default::default();
73        let rocksdb_metrics_handle = if config.rocksdb_enable_metrics {
74            Some(spawn_metrics_loop(
75                &rocksdb_instances,
76                Duration::from_secs(5),
77                async move |this| {
78                    let this = this.load_full();
79                    for item in this.values() {
80                        if let Some(db) = item.weak.upgrade() {
81                            db.refresh_metrics();
82                        };
83                    }
84                },
85            ))
86        } else {
87            None
88        };
89
90        tracing::info!(
91            threads,
92            fdlimit,
93            thread_pool_size,
94            root_dir = %config.root_dir.display(),
95            "storage context created",
96        );
97
98        Ok(Self {
99            inner: Arc::new(StorageContextInner {
100                config,
101                root_dir,
102                files_dir,
103                temp_files,
104                fs_usage,
105                threads,
106                fdlimit,
107                rocksdb_table_context: Default::default(),
108                rocksdb_env,
109                rocksdb_instances_lock: Default::default(),
110                rocksdb_instances,
111                rocksdb_metrics_handle,
112            }),
113        })
114    }
115
116    pub fn config(&self) -> &StorageConfig {
117        &self.inner.config
118    }
119
120    pub fn root_dir(&self) -> &Dir {
121        &self.inner.root_dir
122    }
123
124    pub fn files_dir(&self) -> &Dir {
125        &self.inner.files_dir
126    }
127
128    pub fn temp_files(&self) -> &TempFileStorage {
129        &self.inner.temp_files
130    }
131
132    pub fn threads(&self) -> usize {
133        self.inner.threads
134    }
135
136    pub fn fdlimit(&self) -> u64 {
137        self.inner.fdlimit
138    }
139
140    pub fn fs_usage(&self) -> &FsUsageMonitor {
141        &self.inner.fs_usage
142    }
143
144    pub fn rocksdb_table_context(&self) -> &TableContext {
145        &self.inner.rocksdb_table_context
146    }
147
148    pub fn rocksdb_env(&self) -> &rocksdb::Env {
149        &self.inner.rocksdb_env
150    }
151
152    pub fn trigger_rocksdb_compaction(&self, name: &str) -> bool {
153        let Some(known) = self.inner.rocksdb_instances.load().get(name).cloned() else {
154            return false;
155        };
156        known.compaction_events.notify_waiters();
157        true
158    }
159
160    pub fn add_rocksdb_instance(&self, name: &str, db: &weedb::WeeDbRaw) -> bool {
161        let _guard = self.inner.rocksdb_instances_lock.lock().unwrap();
162
163        let mut items = Arc::unwrap_or_clone(self.inner.rocksdb_instances.load_full());
164
165        // Remove all identical items just in case.
166        let db = weedb::WeeDbRaw::downgrade(db);
167        items.retain(|_, item| !weedb::WeakWeeDbRaw::ptr_eq(&db, &item.weak));
168
169        // Insert a new item.
170        let updated = match items.entry(name.to_owned()) {
171            hash_map::Entry::Vacant(entry) => {
172                entry.insert(Arc::new(KnownInstance::new(name, db)));
173                true
174            }
175            hash_map::Entry::Occupied(mut entry) => {
176                if !weedb::WeakWeeDbRaw::ptr_eq(&db, &entry.get().weak) {
177                    *entry.get_mut() = Arc::new(KnownInstance::new(name, db));
178                    true
179                } else {
180                    false
181                }
182            }
183        };
184
185        // Store the updated items back
186        self.inner.rocksdb_instances.store(Arc::new(items));
187        updated
188    }
189
190    pub fn validate_options<T>(&self) -> Result<()>
191    where
192        T: NamedTables<Context = TableContext> + 'static,
193    {
194        let this = self.inner.as_ref();
195        // TODO: Make configuration fallible in `weedb`?
196        weedb::WeeDbRaw::builder("nonexisting", this.rocksdb_table_context.clone())
197            .with_options(|opts, _| self.apply_default_options(opts))
198            .with_tables::<T>();
199        Ok(())
200    }
201
202    pub fn open_preconfigured<P, T>(&self, subdir: P) -> Result<weedb::WeeDb<T>>
203    where
204        P: AsRef<Path>,
205        T: NamedTables<Context = TableContext> + 'static,
206    {
207        let subdir = subdir.as_ref();
208        tracing::debug!(subdir = %subdir.display(), "opening RocksDB instance");
209
210        let this = self.inner.as_ref();
211
212        let db_dir = this.root_dir.create_subdir(subdir)?;
213        this.fs_usage.add_path(db_dir.path());
214        let db =
215            weedb::WeeDb::<T>::builder_prepared(db_dir.path(), this.rocksdb_table_context.clone())
216                .with_metrics_enabled(this.config.rocksdb_enable_metrics)
217                .with_options(|opts, _| self.apply_default_options(opts))
218                .build()?;
219
220        if let Some(name) = db.db_name() {
221            self.add_rocksdb_instance(name, db.raw());
222        }
223
224        tracing::debug!(current_rocksdb_buffer_usage = ?self.rocksdb_table_context().buffer_usage());
225
226        Ok(db)
227    }
228
229    pub fn apply_default_options(&self, opts: &mut rocksdb::Options) {
230        let this = self.inner.as_ref();
231
232        opts.set_paranoid_checks(false);
233
234        // parallel compactions finishes faster - less write stalls
235        opts.set_max_subcompactions(this.threads as u32 / 2);
236
237        // io
238        opts.set_max_open_files(this.fdlimit as i32);
239
240        // logging
241        opts.set_log_level(rocksdb::LogLevel::Info);
242        opts.set_keep_log_file_num(2);
243        opts.set_recycle_log_file_num(2);
244        opts.set_max_log_file_size(ByteSize::gib(1).as_u64() as usize);
245
246        // cf
247        opts.create_if_missing(true);
248        opts.create_missing_column_families(true);
249
250        // cpu
251        // https://github.com/facebook/rocksdb/blob/0560544e86c1f97f8d1da348f2647aadaefbd095/options/options.cc#L680-L685
252        // docs are lying as always
253        // so fuck this deprecation warning
254        #[allow(deprecated)]
255        opts.set_max_background_flushes(this.threads as i32 / 2);
256        #[allow(deprecated)]
257        opts.set_max_background_compactions(this.threads as i32 / 2);
258
259        opts.set_env(&this.rocksdb_env);
260
261        opts.set_allow_concurrent_memtable_write(false);
262
263        // debug
264        // NOTE: could slower everything a bit in some cloud environments.
265        //       See: https://github.com/facebook/rocksdb/issues/3889
266        //
267        // opts.enable_statistics();
268        // opts.set_stats_dump_period_sec(600);
269    }
270}
271
272struct StorageContextInner {
273    config: StorageConfig,
274    root_dir: Dir,
275    files_dir: Dir,
276    temp_files: TempFileStorage,
277    fs_usage: FsUsageMonitor,
278    threads: usize,
279    fdlimit: u64,
280    rocksdb_table_context: TableContext,
281    rocksdb_env: rocksdb::Env,
282    rocksdb_instances_lock: Mutex<()>,
283    rocksdb_instances: Arc<ArcSwap<KnownInstances>>,
284    rocksdb_metrics_handle: Option<AbortHandle>,
285}
286
287impl Drop for StorageContextInner {
288    fn drop(&mut self) {
289        if let Some(handle) = &self.rocksdb_metrics_handle {
290            handle.abort();
291        }
292    }
293}
294
295type KnownInstances = FastHashMap<String, Arc<KnownInstance>>;
296
297struct KnownInstance {
298    weak: WeakWeeDbRaw,
299    compaction_events: Arc<Notify>,
300    task_handle: AbortHandle,
301}
302
303impl KnownInstance {
304    fn new(name: &str, weak: WeakWeeDbRaw) -> Self {
305        let name = name.to_owned();
306        let compaction_events = Arc::new(Notify::new());
307        let task_handle = tokio::task::spawn({
308            let weak = weak.clone();
309            let compaction_events = compaction_events.clone();
310            async move {
311                tracing::debug!(name, "compaction trigger listener started");
312                scopeguard::defer! {
313                    tracing::debug!(name, "compaction trigger listener stopped");
314                }
315
316                let mut notified = compaction_events.notified();
317                loop {
318                    notified.await;
319                    notified = compaction_events.notified();
320                    let Some(db) = weak.upgrade() else {
321                        break;
322                    };
323
324                    db.compact();
325                }
326            }
327        })
328        .abort_handle();
329
330        Self {
331            weak,
332            compaction_events,
333            task_handle,
334        }
335    }
336}
337
338impl Drop for KnownInstance {
339    fn drop(&mut self) {
340        self.task_handle.abort();
341    }
342}