async_rocksdb/
lib.rs

1//! # async-rocksdb
2//!
3//! An ergonomic, async wrapper for RocksDB in Rust.
4//!
5//! Provides non-blocking operations suitable for Tokio-based applications.
6//!
7//! ## Features
8//!
9//! - Async reads/writes
10//! - Column families
11//! - Snapshots
12//! - Prefix scans
13//! - Batch operations
14//! - Manual flush/compaction
15
16// src/lib.rs
17
18use rocksdb::{
19    DB, Options, BlockBasedOptions, Cache,
20    ColumnFamilyDescriptor, DBCompressionType, IteratorMode, ReadOptions,
21};
22use std::path::Path;
23use std::sync::Arc;
24use thiserror::Error;
25use tokio::task;
26
27/// Error type for async-rocksdb operations.
28#[derive(Error, Debug)]
29pub enum AsyncRocksError {
30    #[error("RocksDB error: {0}")]
31    Rocks(#[from] rocksdb::Error),
32    #[error("Task failed: {0}")]
33    Join(#[from] task::JoinError),
34    #[error("Column family not found: {0}")]
35    ColumnFamilyNotFound(String),
36}
37
38/// Configuration for a single column family.
39pub struct ColumnFamilyConfig {
40    name: String,
41    options: Options,
42}
43
44impl ColumnFamilyConfig {
45    /// Creates a new column family configuration with the given name.
46    /// 
47    /// The column family will be created if it doesn't exist when the database is opened.
48    pub fn new(name: &str) -> Self {
49        let mut options = Options::default();
50        options.create_if_missing(true);
51        Self {
52            name: name.to_string(),
53            options,
54        }
55    }
56
57    /// Sets the compression type for this column family.
58    pub fn compression(mut self, compression: DBCompressionType) -> Self {
59        self.options.set_compression_type(compression);
60        self
61    }
62
63    /// Sets the block cache size for this column family.
64    pub fn block_cache_size(mut self, size_bytes: usize) -> Self {
65        let cache = Cache::new_lru_cache(size_bytes);
66        let mut block_opts = BlockBasedOptions::default();
67        block_opts.set_block_cache(&cache);
68        self.options.set_block_based_table_factory(&block_opts);
69        self
70    }
71}
72
73/// Builder for configuring and opening an [`AsyncRocksDB`] instance.
74pub struct AsyncRocksBuilder {
75    db_options: Options,
76    column_families: Vec<ColumnFamilyConfig>,
77}
78
79impl Default for AsyncRocksBuilder {
80    fn default() -> Self {
81        let mut db_options = Options::default();
82        db_options.create_if_missing(true);
83        Self {
84            db_options,
85            column_families: vec![],
86        }
87    }
88}
89
90impl AsyncRocksBuilder {
91    /// Creates a new builder with default options.
92    pub fn new() -> Self {
93        Self::default()
94    }
95
96    /// Adds a column family with the given name and default options.
97    pub fn add_column_family(mut self, name: &str) -> Self {
98        self.column_families.push(ColumnFamilyConfig::new(name));
99        self
100    }
101
102    /// Opens the database at the given path with the configured options.
103    pub async fn open<P: AsRef<Path>>(mut self, path: P) -> Result<AsyncRocksDB, AsyncRocksError> {
104        let path = path.as_ref().to_path_buf();
105
106        self.db_options.create_if_missing(true);
107
108        if !self.column_families.is_empty() {
109            self.db_options.create_missing_column_families(true);
110        }
111
112        let db = if self.column_families.is_empty() {
113            task::spawn_blocking(move || DB::open(&self.db_options, &path)).await??
114        } else {
115            let cf_descriptors: Vec<_> = self.column_families
116                .into_iter()
117                .map(|cf| ColumnFamilyDescriptor::new(cf.name, cf.options))
118                .collect();
119
120            task::spawn_blocking(move || {
121                DB::open_cf_descriptors(&self.db_options, &path, cf_descriptors)
122            })
123            .await??
124        };
125
126        Ok(AsyncRocksDB {
127            inner: Arc::new(db),
128        })
129    }
130}
131
132/// A point-in-time snapshot of the database.
133///
134/// Snapshots provide a consistent view of the data at the time they were created.
135/// In an async context with `spawn_blocking`, reads using a snapshot may see later writes
136/// due to scheduling, but they offer best-effort consistency.
137#[derive(Clone)]
138pub struct Snapshot {
139    db: Arc<DB>,
140}
141
142impl Snapshot {
143    fn new(db: Arc<DB>) -> Self {
144        Self { db }
145    }
146}
147
148/// Async-aware wrapper for RocksDB.
149///
150/// Provides non-blocking operations suitable for use in Tokio-based applications.
151/// All operations are executed via `tokio::task::spawn_blocking` to avoid blocking the async runtime.
152pub struct AsyncRocksDB {
153    inner: Arc<DB>,
154}
155
156impl AsyncRocksDB {
157    /// Opens a database with default options at the given path.
158    ///
159    /// Equivalent to `AsyncRocksBuilder::new().open(path)`.
160    pub async fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, AsyncRocksError> {
161        AsyncRocksBuilder::new().open(path).await
162    }
163
164    /// Creates a new snapshot of the current database state.
165    pub fn snapshot(&self) -> Snapshot {
166        Snapshot::new(self.inner.clone())
167    }
168
169    /// Inserts a key-value pair into the database.
170    ///
171    /// `cf` specifies the column family. Use `None` for the default column family.
172    pub async fn put<K, V>(
173        &self,
174        key: K,
175        value: V,
176        cf: Option<&str>,
177    ) -> Result<(), AsyncRocksError>
178    where
179        K: AsRef<[u8]> + Send + 'static,
180        V: AsRef<[u8]> + Send + 'static,
181    {
182        let db = self.inner.clone();
183        let key = key.as_ref().to_vec();
184        let value = value.as_ref().to_vec();
185        let cf_name = cf.map(|s| s.to_string());
186
187        task::spawn_blocking(move || {
188            let cf_name = cf_name.as_deref().unwrap_or("default");
189            let cf = db.cf_handle(cf_name)
190                .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
191            db.put_cf(cf, key, value)?;
192            Ok(())
193        })
194        .await?
195    }
196
197    /// Retrieves a value by key.
198    ///
199    /// Returns `None` if the key does not exist.
200    /// Use `snapshot` for reading from a point-in-time view.
201    pub async fn get<K>(
202        &self,
203        key: K,
204        cf: Option<&str>,
205        snapshot: Option<Snapshot>,
206    ) -> Result<Option<Vec<u8>>, AsyncRocksError>
207    where
208        K: AsRef<[u8]> + Send + 'static,
209    {
210        let db = self.inner.clone();
211        let key = key.as_ref().to_vec();
212        let cf_name = cf.map(|s| s.to_string());
213
214        task::spawn_blocking(move || {
215            let cf_name = cf_name.as_deref().unwrap_or("default");
216            let cf = db.cf_handle(cf_name)
217                .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
218
219            if let Some(snap) = snapshot {
220                let raw_snap = snap.db.snapshot();
221                let mut opts = ReadOptions::default();
222                opts.set_snapshot(&raw_snap);
223                db.get_cf_opt(cf, &key, &opts)
224            } else {
225                db.get_cf(cf, &key)
226            }
227            .map_err(Into::into)
228        })
229        .await?
230    }
231
232    /// Deletes a key from the database.
233    pub async fn delete<K>(
234        &self,
235        key: K,
236        cf: Option<&str>,
237    ) -> Result<(), AsyncRocksError>
238    where
239        K: AsRef<[u8]> + Send + 'static,
240    {
241        let db = self.inner.clone();
242        let key = key.as_ref().to_vec();
243        let cf_name = cf.map(|s| s.to_string());
244
245        task::spawn_blocking(move || {
246            let cf_name = cf_name.as_deref().unwrap_or("default");
247            let cf = db.cf_handle(cf_name)
248                .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
249            db.delete_cf(cf, key)?;
250            Ok(())
251        })
252        .await?
253    }
254
255    /// Retrieves multiple values by keys in a single operation.
256    ///
257    /// Returns a `Vec` with the same length as `keys`, containing `Some(value)` or `None`.
258    pub async fn multi_get<K>(
259        &self,
260        keys: Vec<K>,
261        cf: Option<&str>,
262        snapshot: Option<Snapshot>,
263    ) -> Result<Vec<Option<Vec<u8>>>, AsyncRocksError>
264    where
265        K: AsRef<[u8]> + Send + 'static,
266    {
267        let db = self.inner.clone();
268        let keys: Vec<Vec<u8>> = keys.into_iter().map(|k| k.as_ref().to_vec()).collect();
269        let cf_name = cf.map(|s| s.to_string());
270
271        task::spawn_blocking(move || {
272            let cf_name = cf_name.as_deref().unwrap_or("default");
273            let cf = db.cf_handle(cf_name)
274                .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
275
276            if let Some(snap) = snapshot {
277                let raw_snap = snap.db.snapshot();
278                let mut opts = ReadOptions::default();
279                opts.set_snapshot(&raw_snap);
280                db.multi_get_cf_opt(keys.iter().map(|k| (&cf, k)), &opts)
281            } else {
282                db.multi_get_cf(keys.iter().map(|k| (&cf, k)))
283            }
284            .into_iter()
285            .map(|r| r.map_err(Into::into))
286            .collect()
287        })
288        .await?
289    }
290
291    /// Deletes multiple keys in a single operation.
292    pub async fn multi_delete<K>(
293        &self,
294        keys: Vec<K>,
295        cf: Option<&str>,
296    ) -> Result<(), AsyncRocksError>
297    where
298        K: AsRef<[u8]> + Send + 'static,
299    {
300        let db = self.inner.clone();
301        let keys: Vec<Vec<u8>> = keys.into_iter().map(|k| k.as_ref().to_vec()).collect();
302        let cf_name = cf.map(|s| s.to_string());
303
304        task::spawn_blocking(move || {
305            let mut batch = rocksdb::WriteBatch::default();
306            let cf_name = cf_name.as_deref().unwrap_or("default");
307            let cf = db.cf_handle(cf_name)
308                .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
309
310            for key in keys {
311                batch.delete_cf(&cf, &key);
312            }
313            db.write(batch)?;
314            Ok(())
315        })
316        .await?
317    }
318
319    /// Forces a flush of memtables to SST files for the entire database.
320    pub async fn flush(&self) -> Result<(), AsyncRocksError> {
321        let db = self.inner.clone();
322        task::spawn_blocking(move || db.flush()).await??;
323        Ok(())
324    }
325
326    /// Compacts a range of keys in the specified column family.
327    pub async fn compact_range<K: AsRef<[u8]>>(
328        &self,
329        start: Option<K>,
330        end: Option<K>,
331        cf: Option<&str>,
332    ) -> Result<(), AsyncRocksError> {
333        let db = self.inner.clone();
334        let start = start.map(|k| k.as_ref().to_vec());
335        let end = end.map(|k| k.as_ref().to_vec());
336        let cf_name = cf.map(|s| s.to_string());
337
338        task::spawn_blocking(move || {
339            let cf_name = cf_name.as_deref().unwrap_or("default");
340            let cf = db.cf_handle(cf_name)
341                .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
342            db.compact_range_cf(&cf, start.as_deref(), end.as_deref());
343            Ok(())
344        })
345        .await?
346    }
347
348    /// Returns all key-value pairs in the specified column family.
349    pub async fn all(
350        &self,
351        cf: Option<&str>,
352        snapshot: Option<Snapshot>,
353    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, AsyncRocksError> {
354        let db = self.inner.clone();
355        let cf_name = cf.map(|s| s.to_string());
356
357        task::spawn_blocking(move || {
358            let cf_name = cf_name.as_deref().unwrap_or("default");
359            let cf = db.cf_handle(cf_name)
360                .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
361
362            let iter = if let Some(snap) = snapshot {
363                let raw_snap = snap.db.snapshot();
364                let mut opts = ReadOptions::default();
365                opts.set_snapshot(&raw_snap);
366                db.iterator_cf_opt(cf, opts, IteratorMode::Start)
367            } else {
368                db.iterator_cf(cf, IteratorMode::Start)
369            };
370
371            iter.map(|r| r.map(|(k, v)| (k.to_vec(), v.to_vec())))
372                .collect::<Result<Vec<_>, _>>()
373                .map_err(Into::into)
374        })
375        .await?
376    }
377
378    /// Returns all key-value pairs matching the given prefix.
379    ///
380    /// Uses RocksDB's prefix seek optimization for high performance.
381    pub async fn prefix_all<P>(
382        &self,
383        prefix: P,
384        cf: Option<&str>,
385        snapshot: Option<Snapshot>,
386    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, AsyncRocksError>
387    where
388        P: AsRef<[u8]> + Send + 'static,
389    {
390        let db = self.inner.clone();
391        let prefix = prefix.as_ref().to_vec();
392        let cf_name = cf.map(|s| s.to_string());
393
394        task::spawn_blocking(move || {
395            let cf_name = cf_name.as_deref().unwrap_or("default");
396            let cf = db.cf_handle(cf_name)
397                .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
398
399            let mut opts = ReadOptions::default();
400            opts.set_prefix_same_as_start(true);
401
402            if let Some(snap) = snapshot {
403                let raw_snap = snap.db.snapshot();
404                opts.set_snapshot(&raw_snap);
405            }
406
407            let iter = db.iterator_cf_opt(
408                cf,
409                opts,
410                IteratorMode::From(&prefix, rocksdb::Direction::Forward),
411            );
412
413            iter.take_while(|r| {
414                r.as_ref()
415                    .map(|(k, _)| k.starts_with(&prefix))
416                    .unwrap_or(false)
417            })
418            .map(|r| r.map(|(k, v)| (k.to_vec(), v.to_vec())))
419            .collect::<Result<Vec<_>, _>>()
420            .map_err(Into::into)
421        })
422        .await?
423    }
424}