async_rocksdb/
lib.rs

1// src/lib.rs
2
3//! # async-rocksdb
4//!
5//! An ergonomic, async wrapper for RocksDB in Rust.
6//!
7//! Provides non-blocking operations suitable for Tokio-based applications.
8//!
9//! ## Important Note on Column Families
10//!
11//! When you use `AsyncRocksBuilder::add_column_family`, the database is opened in "explicit column family mode".
12//! In this mode, the **default column family is not created automatically**.
13//!
14//! If you want to use `cf: None` (which maps to "default"), you **must** explicitly add it:
15//!
16//! ```rust
17//! use async_rocksdb::AsyncRocksBuilder;
18//!
19//! #[tokio::main]
20//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
21//!     let path = "/tmp/async-rocksdb-doctest-cf";
22//!     let _ = std::fs::remove_dir_all(path); // cleanup
23//!
24//!     let db = AsyncRocksBuilder::new()
25//!         .add_column_family("default")  // required for cf: None
26//!         .add_column_family("users")
27//!         .open(path)
28//!         .await?;
29//!
30//!     db.put(b"key", b"value", None).await?;  // works
31//!     Ok(())
32//! }
33//! ```
34//!
35//! If you don't add any column families, the default one is created automatically and `cf: None` works.
36//!
37//! ## Features
38//!
39//! - Async reads/writes
40//! - Column families
41//! - Snapshots
42//! - Prefix scans
43//! - Batch operations
44//! - Manual flush/compaction
45
46use rocksdb::{
47    DB, Options, BlockBasedOptions, Cache,
48    ColumnFamilyDescriptor, DBCompressionType, IteratorMode, ReadOptions,
49};
50use std::path::Path;
51use std::sync::Arc;
52use thiserror::Error;
53use tokio::task;
54
55/// Error type for async-rocksdb operations.
56#[derive(Error, Debug)]
57pub enum AsyncRocksError {
58    #[error("RocksDB error: {0}")]
59    Rocks(#[from] rocksdb::Error),
60    #[error("Task failed: {0}")]
61    Join(#[from] task::JoinError),
62    #[error("Column family not found: {0}")]
63    ColumnFamilyNotFound(String),
64}
65
66/// Configuration for a single column family.
67pub struct ColumnFamilyConfig {
68    name: String,
69    options: Options,
70}
71
72impl ColumnFamilyConfig {
73    pub fn new(name: &str) -> Self {
74        let mut options = Options::default();
75        options.create_if_missing(true);
76        Self {
77            name: name.to_string(),
78            options,
79        }
80    }
81
82    pub fn compression(mut self, compression: DBCompressionType) -> Self {
83        self.options.set_compression_type(compression);
84        self
85    }
86
87    pub fn block_cache_size(mut self, size_bytes: usize) -> Self {
88        let cache = Cache::new_lru_cache(size_bytes);
89        let mut block_opts = BlockBasedOptions::default();
90        block_opts.set_block_cache(&cache);
91        self.options.set_block_based_table_factory(&block_opts);
92        self
93    }
94}
95
96/// Builder for configuring and opening an [`AsyncRocksDB`] instance.
97pub struct AsyncRocksBuilder {
98    db_options: Options,
99    column_families: Vec<ColumnFamilyConfig>,
100}
101
102impl Default for AsyncRocksBuilder {
103    fn default() -> Self {
104        let mut db_options = Options::default();
105        db_options.create_if_missing(true);
106        Self {
107            db_options,
108            column_families: vec![],
109        }
110    }
111}
112
113impl AsyncRocksBuilder {
114    pub fn new() -> Self {
115        Self::default()
116    }
117
118    pub fn add_column_family(mut self, name: &str) -> Self {
119        self.column_families.push(ColumnFamilyConfig::new(name));
120        self
121    }
122
123    pub async fn open<P: AsRef<Path>>(mut self, path: P) -> Result<AsyncRocksDB, AsyncRocksError> {
124        let path = path.as_ref().to_path_buf();
125
126        self.db_options.create_if_missing(true);
127
128        if !self.column_families.is_empty() {
129            self.db_options.create_missing_column_families(true);
130        }
131
132        let db = if self.column_families.is_empty() {
133            task::spawn_blocking(move || DB::open(&self.db_options, &path)).await??
134        } else {
135            let cf_descriptors: Vec<_> = self.column_families
136                .into_iter()
137                .map(|cf| ColumnFamilyDescriptor::new(cf.name, cf.options))
138                .collect();
139
140            task::spawn_blocking(move || {
141                DB::open_cf_descriptors(&self.db_options, &path, cf_descriptors)
142            })
143            .await??
144        };
145
146        Ok(AsyncRocksDB {
147            inner: Arc::new(db),
148        })
149    }
150}
151
152/// A point-in-time snapshot of the database.
153#[derive(Clone)]
154pub struct Snapshot {
155    db: Arc<DB>,
156}
157
158impl Snapshot {
159    fn new(db: Arc<DB>) -> Self {
160        Self { db }
161    }
162}
163
164/// Async RocksDB wrapper.
165pub struct AsyncRocksDB {
166    inner: Arc<DB>,
167}
168
169impl AsyncRocksDB {
170    pub async fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, AsyncRocksError> {
171        AsyncRocksBuilder::new().open(path).await
172    }
173
174    pub fn snapshot(&self) -> Snapshot {
175        Snapshot::new(self.inner.clone())
176    }
177
178    pub async fn put<K, V>(
179        &self,
180        key: K,
181        value: V,
182        cf: Option<&str>,
183    ) -> Result<(), AsyncRocksError>
184    where
185        K: AsRef<[u8]> + Send + 'static,
186        V: AsRef<[u8]> + Send + 'static,
187    {
188        let db = self.inner.clone();
189        let key = key.as_ref().to_vec();
190        let value = value.as_ref().to_vec();
191        let cf_name = cf.map(|s| s.to_string());
192
193        task::spawn_blocking(move || {
194            let cf_name = cf_name.as_deref().unwrap_or("default");
195            let cf = db.cf_handle(cf_name)
196                .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
197            db.put_cf(cf, key, value)?;
198            Ok(())
199        })
200        .await?
201    }
202
203    pub async fn get<K>(
204        &self,
205        key: K,
206        cf: Option<&str>,
207        snapshot: Option<Snapshot>,
208    ) -> Result<Option<Vec<u8>>, AsyncRocksError>
209    where
210        K: AsRef<[u8]> + Send + 'static,
211    {
212        let db = self.inner.clone();
213        let key = key.as_ref().to_vec();
214        let cf_name = cf.map(|s| s.to_string());
215
216        task::spawn_blocking(move || {
217            let cf_name = cf_name.as_deref().unwrap_or("default");
218            let cf = db.cf_handle(cf_name)
219                .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
220
221            if let Some(snap) = snapshot {
222                let raw_snap = snap.db.snapshot();
223                let mut opts = ReadOptions::default();
224                opts.set_snapshot(&raw_snap);
225                db.get_cf_opt(cf, &key, &opts)
226            } else {
227                db.get_cf(cf, &key)
228            }
229            .map_err(Into::into)
230        })
231        .await?
232    }
233
234    pub async fn delete<K>(
235        &self,
236        key: K,
237        cf: Option<&str>,
238    ) -> Result<(), AsyncRocksError>
239    where
240        K: AsRef<[u8]> + Send + 'static,
241    {
242        let db = self.inner.clone();
243        let key = key.as_ref().to_vec();
244        let cf_name = cf.map(|s| s.to_string());
245
246        task::spawn_blocking(move || {
247            let cf_name = cf_name.as_deref().unwrap_or("default");
248            let cf = db.cf_handle(cf_name)
249                .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
250            db.delete_cf(cf, key)?;
251            Ok(())
252        })
253        .await?
254    }
255
256    pub async fn multi_get<K>(
257        &self,
258        keys: Vec<K>,
259        cf: Option<&str>,
260        snapshot: Option<Snapshot>,
261    ) -> Result<Vec<Option<Vec<u8>>>, AsyncRocksError>
262    where
263        K: AsRef<[u8]> + Send + 'static,
264    {
265        let db = self.inner.clone();
266        let keys: Vec<Vec<u8>> = keys.into_iter().map(|k| k.as_ref().to_vec()).collect();
267        let cf_name = cf.map(|s| s.to_string());
268
269        task::spawn_blocking(move || {
270            let cf_name = cf_name.as_deref().unwrap_or("default");
271            let cf = db.cf_handle(cf_name)
272                .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
273
274            if let Some(snap) = snapshot {
275                let raw_snap = snap.db.snapshot();
276                let mut opts = ReadOptions::default();
277                opts.set_snapshot(&raw_snap);
278                db.multi_get_cf_opt(keys.iter().map(|k| (&cf, k)), &opts)
279            } else {
280                db.multi_get_cf(keys.iter().map(|k| (&cf, k)))
281            }
282            .into_iter()
283            .map(|r| r.map_err(Into::into))
284            .collect()
285        })
286        .await?
287    }
288
289    pub async fn multi_delete<K>(
290        &self,
291        keys: Vec<K>,
292        cf: Option<&str>,
293    ) -> Result<(), AsyncRocksError>
294    where
295        K: AsRef<[u8]> + Send + 'static,
296    {
297        let db = self.inner.clone();
298        let keys: Vec<Vec<u8>> = keys.into_iter().map(|k| k.as_ref().to_vec()).collect();
299        let cf_name = cf.map(|s| s.to_string());
300
301        task::spawn_blocking(move || {
302            let mut batch = rocksdb::WriteBatch::default();
303            let cf_name = cf_name.as_deref().unwrap_or("default");
304            let cf = db.cf_handle(cf_name)
305                .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
306
307            for key in keys {
308                batch.delete_cf(&cf, &key);
309            }
310            db.write(batch)?;
311            Ok(())
312        })
313        .await?
314    }
315
316    pub async fn flush(&self) -> Result<(), AsyncRocksError> {
317        let db = self.inner.clone();
318        task::spawn_blocking(move || db.flush()).await??;
319        Ok(())
320    }
321
322    pub async fn compact_range<K: AsRef<[u8]>>(
323        &self,
324        start: Option<K>,
325        end: Option<K>,
326        cf: Option<&str>,
327    ) -> Result<(), AsyncRocksError> {
328        let db = self.inner.clone();
329        let start = start.map(|k| k.as_ref().to_vec());
330        let end = end.map(|k| k.as_ref().to_vec());
331        let cf_name = cf.map(|s| s.to_string());
332
333        task::spawn_blocking(move || {
334            let cf_name = cf_name.as_deref().unwrap_or("default");
335            let cf = db.cf_handle(cf_name)
336                .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
337            db.compact_range_cf(&cf, start.as_deref(), end.as_deref());
338            Ok(())
339        })
340        .await?
341    }
342
343    pub async fn all(
344        &self,
345        cf: Option<&str>,
346        snapshot: Option<Snapshot>,
347    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, AsyncRocksError> {
348        let db = self.inner.clone();
349        let cf_name = cf.map(|s| s.to_string());
350
351        task::spawn_blocking(move || {
352            let cf_name = cf_name.as_deref().unwrap_or("default");
353            let cf = db.cf_handle(cf_name)
354                .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
355
356            let iter = if let Some(snap) = snapshot {
357                let raw_snap = snap.db.snapshot();
358                let mut opts = ReadOptions::default();
359                opts.set_snapshot(&raw_snap);
360                db.iterator_cf_opt(cf, opts, IteratorMode::Start)
361            } else {
362                db.iterator_cf(cf, IteratorMode::Start)
363            };
364
365            iter.map(|r| r.map(|(k, v)| (k.to_vec(), v.to_vec())))
366                .collect::<Result<Vec<_>, _>>()
367                .map_err(Into::into)
368        })
369        .await?
370    }
371
372    pub async fn prefix_all<P>(
373        &self,
374        prefix: P,
375        cf: Option<&str>,
376        snapshot: Option<Snapshot>,
377    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, AsyncRocksError>
378    where
379        P: AsRef<[u8]> + Send + 'static,
380    {
381        let db = self.inner.clone();
382        let prefix = prefix.as_ref().to_vec();
383        let cf_name = cf.map(|s| s.to_string());
384
385        task::spawn_blocking(move || {
386            let cf_name = cf_name.as_deref().unwrap_or("default");
387            let cf = db.cf_handle(cf_name)
388                .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
389
390            let mut opts = ReadOptions::default();
391            opts.set_prefix_same_as_start(true);
392
393            if let Some(snap) = snapshot {
394                let raw_snap = snap.db.snapshot();
395                opts.set_snapshot(&raw_snap);
396            }
397
398            let iter = db.iterator_cf_opt(
399                cf,
400                opts,
401                IteratorMode::From(&prefix, rocksdb::Direction::Forward),
402            );
403
404            iter.take_while(|r| {
405                r.as_ref()
406                    .map(|(k, _)| k.starts_with(&prefix))
407                    .unwrap_or(false)
408            })
409            .map(|r| r.map(|(k, v)| (k.to_vec(), v.to_vec())))
410            .collect::<Result<Vec<_>, _>>()
411            .map_err(Into::into)
412        })
413        .await?
414    }
415}