rust_rocksdb/
perf.rs

1// Copyright 2020 Tran Tuan Linh
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use libc::{c_int, c_uchar};
16use std::marker::PhantomData;
17
18use crate::cache::Cache;
19use crate::ffi_util::from_cstr_and_free;
20use crate::{DB, DBCommon, ThreadMode, TransactionDB};
21use crate::{Error, db::DBInner, ffi};
22
23#[derive(Debug, Copy, Clone, PartialEq, Eq)]
24#[repr(i32)]
25pub enum PerfStatsLevel {
26    /// Unknown settings
27    Uninitialized = 0,
28    /// Disable perf stats
29    Disable,
30    /// Enables only count stats
31    EnableCount,
32    /// Count stats and enable time stats except for mutexes
33    EnableTimeExceptForMutex,
34    /// Other than time, also measure CPU time counters. Still don't measure
35    /// time (neither wall time nor CPU time) for mutexes
36    EnableTimeAndCPUTimeExceptForMutex,
37    /// Enables count and time stats
38    EnableTime,
39    /// N.B must always be the last value!
40    OutOfBound,
41}
42
43// Include the generated PerfMetric enum from perf_enum.rs
44include!("perf_enum.rs");
45
46/// Sets the perf stats level for current thread.
47pub fn set_perf_stats(lvl: PerfStatsLevel) {
48    unsafe {
49        ffi::rocksdb_set_perf_level(lvl as c_int);
50    }
51}
52
53/// Thread local context for gathering performance counter efficiently
54/// and transparently.
55pub struct PerfContext {
56    pub(crate) inner: *mut ffi::rocksdb_perfcontext_t,
57}
58
59impl Default for PerfContext {
60    fn default() -> Self {
61        let ctx = unsafe { ffi::rocksdb_perfcontext_create() };
62        assert!(!ctx.is_null(), "Could not create Perf Context");
63
64        Self { inner: ctx }
65    }
66}
67
68impl Drop for PerfContext {
69    fn drop(&mut self) {
70        unsafe {
71            ffi::rocksdb_perfcontext_destroy(self.inner);
72        }
73    }
74}
75
76impl PerfContext {
77    /// Reset context
78    pub fn reset(&mut self) {
79        unsafe {
80            ffi::rocksdb_perfcontext_reset(self.inner);
81        }
82    }
83
84    /// Get the report on perf
85    pub fn report(&self, exclude_zero_counters: bool) -> String {
86        unsafe {
87            let ptr =
88                ffi::rocksdb_perfcontext_report(self.inner, c_uchar::from(exclude_zero_counters));
89            from_cstr_and_free(ptr)
90        }
91    }
92
93    /// Returns value of a metric
94    pub fn metric(&self, id: PerfMetric) -> u64 {
95        unsafe { ffi::rocksdb_perfcontext_metric(self.inner, id as c_int) }
96    }
97}
98
99/// Memory usage stats
100pub struct MemoryUsageStats {
101    /// Approximate memory usage of all the mem-tables
102    pub mem_table_total: u64,
103    /// Approximate memory usage of un-flushed mem-tables
104    pub mem_table_unflushed: u64,
105    /// Approximate memory usage of all the table readers
106    pub mem_table_readers_total: u64,
107    /// Approximate memory usage by cache
108    pub cache_total: u64,
109}
110
111/// Wrap over memory_usage_t. Hold current memory usage of the specified DB instances and caches
112pub struct MemoryUsage {
113    inner: *mut ffi::rocksdb_memory_usage_t,
114}
115
116impl Drop for MemoryUsage {
117    fn drop(&mut self) {
118        unsafe {
119            ffi::rocksdb_approximate_memory_usage_destroy(self.inner);
120        }
121    }
122}
123
124impl MemoryUsage {
125    /// Approximate memory usage of all the mem-tables
126    pub fn approximate_mem_table_total(&self) -> u64 {
127        unsafe { ffi::rocksdb_approximate_memory_usage_get_mem_table_total(self.inner) }
128    }
129
130    /// Approximate memory usage of un-flushed mem-tables
131    pub fn approximate_mem_table_unflushed(&self) -> u64 {
132        unsafe { ffi::rocksdb_approximate_memory_usage_get_mem_table_unflushed(self.inner) }
133    }
134
135    /// Approximate memory usage of all the table readers
136    pub fn approximate_mem_table_readers_total(&self) -> u64 {
137        unsafe { ffi::rocksdb_approximate_memory_usage_get_mem_table_readers_total(self.inner) }
138    }
139
140    /// Approximate memory usage by cache
141    pub fn approximate_cache_total(&self) -> u64 {
142        unsafe { ffi::rocksdb_approximate_memory_usage_get_cache_total(self.inner) }
143    }
144}
145
146/// Creates [`MemoryUsage`] from DBs and caches.
147///
148/// Most users should call [`get_memory_usage_stats`] instead.
149pub struct MemoryUsageBuilder<'a> {
150    inner: *mut ffi::rocksdb_memory_consumers_t,
151    base_dbs: Vec<*mut ffi::rocksdb_t>,
152    // must not outlive the DBs/caches that are added
153    _marker: PhantomData<&'a ()>,
154}
155
156impl Drop for MemoryUsageBuilder<'_> {
157    fn drop(&mut self) {
158        unsafe {
159            ffi::rocksdb_memory_consumers_destroy(self.inner);
160        }
161        for base_db in &self.base_dbs {
162            unsafe {
163                ffi::rocksdb_transactiondb_close_base_db(*base_db);
164            }
165        }
166    }
167}
168
169impl<'a> MemoryUsageBuilder<'a> {
170    /// Create new instance
171    pub fn new() -> Result<Self, Error> {
172        let mc = unsafe { ffi::rocksdb_memory_consumers_create() };
173        if mc.is_null() {
174            Err(Error::new(
175                "Could not create MemoryUsage builder".to_owned(),
176            ))
177        } else {
178            Ok(Self {
179                inner: mc,
180                base_dbs: Vec::new(),
181                _marker: PhantomData,
182            })
183        }
184    }
185
186    /// Add a DB instance to collect memory usage from it and add up in total stats
187    pub fn add_tx_db<T: ThreadMode>(&mut self, db: &'a TransactionDB<T>) {
188        unsafe {
189            let base_db = ffi::rocksdb_transactiondb_get_base_db(db.inner);
190            ffi::rocksdb_memory_consumers_add_db(self.inner, base_db);
191            // rocksdb_transactiondb_get_base_db allocates a struct that must be freed
192            self.base_dbs.push(base_db);
193        }
194    }
195
196    /// Add a DB instance to collect memory usage from it and add up in total stats
197    pub fn add_db<T: ThreadMode, D: DBInner>(&mut self, db: &'a DBCommon<T, D>) {
198        unsafe {
199            ffi::rocksdb_memory_consumers_add_db(self.inner, db.inner.inner());
200        }
201    }
202
203    /// Add a cache to collect memory usage from it and add up in total stats
204    pub fn add_cache(&mut self, cache: &'a Cache) {
205        unsafe {
206            ffi::rocksdb_memory_consumers_add_cache(self.inner, cache.0.inner.as_ptr());
207        }
208    }
209
210    /// Build up MemoryUsage
211    pub fn build(&self) -> Result<MemoryUsage, Error> {
212        unsafe {
213            let mu = ffi_try!(ffi::rocksdb_approximate_memory_usage_create(self.inner));
214            Ok(MemoryUsage { inner: mu })
215        }
216    }
217}
218
219/// Get memory usage stats from DB instances and Cache instances
220pub fn get_memory_usage_stats(
221    dbs: Option<&[&DB]>,
222    caches: Option<&[&Cache]>,
223) -> Result<MemoryUsageStats, Error> {
224    let mut builder = MemoryUsageBuilder::new()?;
225    if let Some(dbs_) = dbs {
226        for db in dbs_ {
227            builder.add_db(db);
228        }
229    }
230    if let Some(caches_) = caches {
231        for cache in caches_ {
232            builder.add_cache(cache);
233        }
234    }
235
236    let mu = builder.build()?;
237    Ok(MemoryUsageStats {
238        mem_table_total: mu.approximate_mem_table_total(),
239        mem_table_unflushed: mu.approximate_mem_table_unflushed(),
240        mem_table_readers_total: mu.approximate_mem_table_readers_total(),
241        cache_total: mu.approximate_cache_total(),
242    })
243}
244
245#[cfg(test)]
246mod tests {
247    use super::*;
248    use crate::{DB, Options};
249    use tempfile::TempDir;
250
251    #[test]
252    fn test_perf_context_with_db_operations() {
253        let temp_dir = TempDir::new().unwrap();
254        let mut opts = Options::default();
255        opts.create_if_missing(true);
256        let db = DB::open(&opts, temp_dir.path()).unwrap();
257
258        // Insert data with deletions to test internal key/delete skipping
259        let n = 10;
260        for i in 0..n {
261            let k = vec![i as u8];
262            db.put(&k, &k).unwrap();
263            if i % 2 == 0 {
264                db.delete(&k).unwrap();
265            }
266        }
267
268        set_perf_stats(PerfStatsLevel::EnableCount);
269        let mut ctx = PerfContext::default();
270
271        // Use iterator with explicit seek to trigger metrics
272        let mut iter = db.raw_iterator();
273        iter.seek_to_first();
274        let mut valid_count = 0;
275        while iter.valid() {
276            valid_count += 1;
277            iter.next();
278        }
279
280        // Check counts - should have 5 valid entries (odd numbers: 1,3,5,7,9)
281        assert_eq!(
282            valid_count, 5,
283            "Iterator should find 5 valid entries (odd numbers)"
284        );
285
286        // Check internal skip metrics
287        let internal_key_skipped = ctx.metric(PerfMetric::InternalKeySkippedCount);
288        let internal_delete_skipped = ctx.metric(PerfMetric::InternalDeleteSkippedCount);
289
290        // In RocksDB, when iterating over deleted keys in SST files:
291        // - We should skip the deletion markers (n/2 = 5 deletes)
292        // - Total internal keys skipped should be >= number of deletions
293        assert!(
294            internal_key_skipped >= (n / 2) as u64,
295            "internal_key_skipped ({}) should be >= {} (deletions)",
296            internal_key_skipped,
297            n / 2
298        );
299        assert_eq!(
300            internal_delete_skipped,
301            (n / 2) as u64,
302            "internal_delete_skipped ({internal_delete_skipped}) should equal {} (deleted entries)",
303            n / 2
304        );
305        assert_eq!(
306            ctx.metric(PerfMetric::SeekInternalSeekTime),
307            0,
308            "Time metrics should be 0 with EnableCount"
309        );
310
311        // Test reset
312        ctx.reset();
313        assert_eq!(ctx.metric(PerfMetric::InternalKeySkippedCount), 0);
314        assert_eq!(ctx.metric(PerfMetric::InternalDeleteSkippedCount), 0);
315
316        // Change perf level to EnableTime
317        set_perf_stats(PerfStatsLevel::EnableTime);
318
319        // Iterate backwards
320        let mut iter = db.raw_iterator();
321        iter.seek_to_last();
322        let mut backward_count = 0;
323        while iter.valid() {
324            backward_count += 1;
325            iter.prev();
326        }
327        assert_eq!(
328            backward_count, 5,
329            "Backward iteration should also find 5 valid entries"
330        );
331
332        // Check accumulated metrics after second iteration
333        let key_skipped_after = ctx.metric(PerfMetric::InternalKeySkippedCount);
334        let delete_skipped_after = ctx.metric(PerfMetric::InternalDeleteSkippedCount);
335
336        // After both iterations, we should have accumulated more skipped keys
337        assert!(
338            key_skipped_after >= internal_key_skipped,
339            "After second iteration, internal_key_skipped ({key_skipped_after}) should be >= first iteration ({internal_key_skipped})",
340        );
341        assert_eq!(
342            delete_skipped_after,
343            (n / 2) as u64,
344            "internal_delete_skipped should still be {} after second iteration",
345            n / 2
346        );
347
348        // Disable perf stats
349        set_perf_stats(PerfStatsLevel::Disable);
350    }
351}