Skip to main content

calimero_store_rocksdb/
lib.rs

1//! # RocksDB Storage Backend
2//!
3//! This module provides a RocksDB-based implementation of the `Database` trait.
4//!
5//! ## Why No Connection Pool?
6//!
7//! RocksDB already manages resources internally and does not require an external
8//! connection pool. Here's why:
9//!
10//! 1. **Thread Safety**: The `DB` object is thread-safe and designed to be shared
11//!    across multiple threads. A single `DB` instance can handle concurrent
12//!    read/write operations safely.
13//!
14//! 2. **Internal File Handle Management**: RocksDB uses an LRU cache to manage
15//!    open file handles internally. The `max_open_files` option controls how many
16//!    SST files RocksDB keeps open simultaneously. When this limit is reached,
17//!    RocksDB automatically closes least-recently-used file handles.
18//!
19//! 3. **Block Cache**: RocksDB maintains its own block cache for frequently
20//!    accessed data, reducing disk I/O without external caching.
21//!
22//! 4. **File Locking**: RocksDB uses file locking to prevent multiple processes
23//!    from opening the same database. Only one `DB` instance can exist per path
24//!    per process.
25//!
26//! 5. **The `Store` Wrapper**: The higher-level `Store` type already wraps the
27//!    database in an `Arc`, allowing it to be cloned and shared efficiently
28//!    across the application.
29//!
30//! ## Recommended Usage
31//!
32//! ```ignore
33//! // Open the database once
34//! let store = Store::open::<RocksDB>(&config)?;
35//!
36//! // Clone and share the store (cheap Arc clone)
37//! let store_clone = store.clone();
38//!
39//! // Both handles share the same underlying RocksDB instance
40//! ```
41//!
42//! ## Configuration
43//!
44//! Resource limits are configured through RocksDB's native options in the `open`
45//! method:
46//! - `max_open_files`: Controls file descriptor usage (default: 256)
47//! - Block cache: 128MB LRU cache for frequently accessed blocks
48//!
49//! If you need to adjust these settings, modify the `Options` in `RocksDB::open()`.
50
51#[cfg(test)]
52mod tests;
53
54use calimero_store::config::StoreConfig;
55use calimero_store::db::{Column, Database};
56use calimero_store::iter::{DBIter, Iter};
57use calimero_store::slice::Slice;
58use calimero_store::tx::{Operation, Transaction};
59use eyre::{bail, Result as EyreResult};
60use rocksdb::{
61    ColumnFamily, DBRawIteratorWithThreadMode, Options, ReadOptions, Snapshot, WriteBatch, DB,
62};
63use strum::IntoEnumIterator;
64
65/// Default maximum number of open files for RocksDB.
66///
67/// This limits file descriptor usage. RocksDB uses an internal LRU cache
68/// to manage file handles when this limit is reached.
69const DEFAULT_MAX_OPEN_FILES: i32 = 256;
70
71/// Default block cache size in bytes (128MB).
72///
73/// The block cache stores frequently accessed data blocks in memory,
74/// reducing disk I/O for hot data.
75const DEFAULT_BLOCK_CACHE_SIZE: usize = 128 * 1024 * 1024;
76
77/// RocksDB database wrapper implementing the `Database` trait.
78///
79/// This is a thin wrapper around RocksDB's `DB` type. The `DB` instance is
80/// thread-safe and handles its own resource management internally.
81///
82/// ## Resource Management
83///
84/// RocksDB manages resources internally - there is no need for an external
85/// connection pool. Key points:
86///
87/// - **Single instance per path**: RocksDB uses file locking; only one `DB`
88///   can be open per database path per process.
89/// - **Thread-safe**: Share the `RocksDB` instance (or the `Store` wrapper)
90///   across threads freely.
91/// - **Automatic file handle management**: RocksDB's internal LRU cache
92///   manages open file handles based on `max_open_files`.
93///
94/// ## Sharing
95///
96/// To share a database across your application, use the `Store` wrapper which
97/// provides `Arc`-based sharing, or wrap `RocksDB` in an `Arc` yourself.
98#[derive(Debug)]
99pub struct RocksDB {
100    db: DB,
101}
102
103impl RocksDB {
104    fn cf_handle(&self, column: Column) -> Option<&ColumnFamily> {
105        self.db.cf_handle(column.as_ref())
106    }
107
108    fn try_cf_handle(&self, column: Column) -> EyreResult<&ColumnFamily> {
109        let Some(cf_handle) = self.cf_handle(column) else {
110            bail!("unknown column family: {:?}", column);
111        };
112
113        Ok(cf_handle)
114    }
115}
116
117impl Database<'_> for RocksDB {
118    fn open(config: &StoreConfig) -> EyreResult<Self> {
119        let mut options = Options::default();
120
121        options.create_if_missing(true);
122        options.create_missing_column_families(true);
123
124        // Limit file descriptor usage. RocksDB manages an internal LRU cache
125        // for file handles, automatically closing least-recently-used files
126        // when this limit is reached.
127        options.set_max_open_files(DEFAULT_MAX_OPEN_FILES);
128
129        // Configure block cache for better read performance.
130        // This cache stores frequently accessed data blocks in memory.
131        let cache = rocksdb::Cache::new_lru_cache(DEFAULT_BLOCK_CACHE_SIZE);
132        let mut block_opts = rocksdb::BlockBasedOptions::default();
133        block_opts.set_block_cache(&cache);
134        options.set_block_based_table_factory(&block_opts);
135
136        Ok(Self {
137            db: DB::open_cf(&options, &config.path, Column::iter())?,
138        })
139    }
140
141    fn has(&self, col: Column, key: Slice<'_>) -> EyreResult<bool> {
142        let cf_handle = self.try_cf_handle(col)?;
143
144        let exists = self.db.key_may_exist_cf(cf_handle, key.as_ref())
145            && self.get(col, key).map(|value| value.is_some())?;
146
147        Ok(exists)
148    }
149
150    fn get(&self, col: Column, key: Slice<'_>) -> EyreResult<Option<Slice<'_>>> {
151        let cf_handle = self.try_cf_handle(col)?;
152
153        let value = self.db.get_pinned_cf(cf_handle, key.as_ref())?;
154
155        Ok(value.map(Slice::from_owned))
156    }
157
158    fn put(&self, col: Column, key: Slice<'_>, value: Slice<'_>) -> EyreResult<()> {
159        let cf_handle = self.try_cf_handle(col)?;
160
161        self.db.put_cf(cf_handle, key.as_ref(), value.as_ref())?;
162
163        Ok(())
164    }
165
166    fn delete(&self, col: Column, key: Slice<'_>) -> EyreResult<()> {
167        let cf_handle = self.try_cf_handle(col)?;
168
169        self.db.delete_cf(cf_handle, key.as_ref())?;
170
171        Ok(())
172    }
173
174    fn iter(&self, col: Column) -> EyreResult<Iter<'_>> {
175        let cf_handle = self.try_cf_handle(col)?;
176
177        let mut iter = self.db.raw_iterator_cf(cf_handle);
178
179        iter.seek_to_first();
180
181        Ok(Iter::new(DBIterator { ready: true, iter }))
182    }
183
184    fn apply(&self, tx: &Transaction<'_>) -> EyreResult<()> {
185        let mut batch = WriteBatch::default();
186
187        let mut unknown_cfs = vec![];
188
189        for (entry, op) in tx.iter() {
190            let (col, key) = (entry.column(), entry.key());
191
192            let Some(cf) = self.cf_handle(col) else {
193                unknown_cfs.push(col);
194                continue;
195            };
196            match op {
197                Operation::Put { value } => batch.put_cf(cf, key, value),
198                Operation::Delete => batch.delete_cf(cf, key),
199            }
200        }
201
202        if !unknown_cfs.is_empty() {
203            bail!("unknown column families: {:?}", unknown_cfs);
204        }
205
206        self.db.write(batch)?;
207
208        Ok(())
209    }
210
211    fn iter_snapshot(&self, col: Column) -> EyreResult<Iter<'_>> {
212        let cf_handle = self.try_cf_handle(col)?;
213        let snapshot = self.db.snapshot();
214
215        // Create read options with the snapshot pinned
216        let mut read_opts = ReadOptions::default();
217        read_opts.set_snapshot(&snapshot);
218
219        // Create iterator with snapshot-pinned read options
220        let mut iter = self.db.raw_iterator_cf_opt(cf_handle, read_opts);
221        iter.seek_to_first();
222
223        Ok(Iter::new(SnapshotIterator {
224            ready: true,
225            iter,
226            _snapshot: snapshot,
227        }))
228    }
229}
230
231struct DBIterator<'a> {
232    ready: bool,
233    iter: DBRawIteratorWithThreadMode<'a, DB>,
234}
235
236/// Iterator that holds a RocksDB snapshot for consistent reads.
237///
238/// The snapshot is stored alongside the iterator to ensure it outlives
239/// the iterator. The iterator sees a frozen point-in-time view of the DB.
240struct SnapshotIterator<'a> {
241    ready: bool,
242    /// The raw iterator over the snapshot.
243    /// SAFETY: `iter` must be declared before `_snapshot` because Rust drops
244    /// struct fields in declaration order (top-to-bottom). The iterator holds
245    /// references into the snapshot's data, so it must be dropped first.
246    iter: DBRawIteratorWithThreadMode<'a, DB>,
247    /// Snapshot must outlive the iterator. Declared after `iter` to ensure
248    /// correct drop order.
249    _snapshot: Snapshot<'a>,
250}
251
252impl DBIter for DBIterator<'_> {
253    fn seek(&mut self, key: Slice<'_>) -> EyreResult<Option<Slice<'_>>> {
254        self.iter.seek(key);
255
256        self.ready = false;
257
258        Ok(self.iter.key().map(Into::into))
259    }
260
261    fn next(&mut self) -> EyreResult<Option<Slice<'_>>> {
262        if self.ready {
263            self.ready = false;
264        } else {
265            self.iter.next();
266        }
267
268        Ok(self.iter.key().map(Into::into))
269    }
270
271    fn read(&self) -> EyreResult<Slice<'_>> {
272        let Some(value) = self.iter.value() else {
273            bail!("missing value for iterator entry {:?}", self.iter.key());
274        };
275
276        Ok(value.into())
277    }
278}
279
280impl DBIter for SnapshotIterator<'_> {
281    fn seek(&mut self, key: Slice<'_>) -> EyreResult<Option<Slice<'_>>> {
282        self.iter.seek(key);
283
284        self.ready = false;
285
286        Ok(self.iter.key().map(Into::into))
287    }
288
289    fn next(&mut self) -> EyreResult<Option<Slice<'_>>> {
290        if self.ready {
291            self.ready = false;
292        } else {
293            self.iter.next();
294        }
295
296        Ok(self.iter.key().map(Into::into))
297    }
298
299    fn read(&self) -> EyreResult<Slice<'_>> {
300        let Some(value) = self.iter.value() else {
301            bail!("missing value for iterator entry {:?}", self.iter.key());
302        };
303
304        Ok(value.into())
305    }
306}