1use rocksdb::{
19 DB, Options, BlockBasedOptions, Cache,
20 ColumnFamilyDescriptor, DBCompressionType, IteratorMode, ReadOptions,
21};
22use std::path::Path;
23use std::sync::Arc;
24use thiserror::Error;
25use tokio::task;
26
27#[derive(Error, Debug)]
29pub enum AsyncRocksError {
30 #[error("RocksDB error: {0}")]
31 Rocks(#[from] rocksdb::Error),
32 #[error("Task failed: {0}")]
33 Join(#[from] task::JoinError),
34 #[error("Column family not found: {0}")]
35 ColumnFamilyNotFound(String),
36}
37
38pub struct ColumnFamilyConfig {
40 name: String,
41 options: Options,
42}
43
44impl ColumnFamilyConfig {
45 pub fn new(name: &str) -> Self {
49 let mut options = Options::default();
50 options.create_if_missing(true);
51 Self {
52 name: name.to_string(),
53 options,
54 }
55 }
56
57 pub fn compression(mut self, compression: DBCompressionType) -> Self {
59 self.options.set_compression_type(compression);
60 self
61 }
62
63 pub fn block_cache_size(mut self, size_bytes: usize) -> Self {
65 let cache = Cache::new_lru_cache(size_bytes);
66 let mut block_opts = BlockBasedOptions::default();
67 block_opts.set_block_cache(&cache);
68 self.options.set_block_based_table_factory(&block_opts);
69 self
70 }
71}
72
73pub struct AsyncRocksBuilder {
75 db_options: Options,
76 column_families: Vec<ColumnFamilyConfig>,
77}
78
79impl Default for AsyncRocksBuilder {
80 fn default() -> Self {
81 let mut db_options = Options::default();
82 db_options.create_if_missing(true);
83 Self {
84 db_options,
85 column_families: vec![],
86 }
87 }
88}
89
90impl AsyncRocksBuilder {
91 pub fn new() -> Self {
93 Self::default()
94 }
95
96 pub fn add_column_family(mut self, name: &str) -> Self {
98 self.column_families.push(ColumnFamilyConfig::new(name));
99 self
100 }
101
102 pub async fn open<P: AsRef<Path>>(mut self, path: P) -> Result<AsyncRocksDB, AsyncRocksError> {
104 let path = path.as_ref().to_path_buf();
105
106 self.db_options.create_if_missing(true);
107
108 if !self.column_families.is_empty() {
109 self.db_options.create_missing_column_families(true);
110 }
111
112 let db = if self.column_families.is_empty() {
113 task::spawn_blocking(move || DB::open(&self.db_options, &path)).await??
114 } else {
115 let cf_descriptors: Vec<_> = self.column_families
116 .into_iter()
117 .map(|cf| ColumnFamilyDescriptor::new(cf.name, cf.options))
118 .collect();
119
120 task::spawn_blocking(move || {
121 DB::open_cf_descriptors(&self.db_options, &path, cf_descriptors)
122 })
123 .await??
124 };
125
126 Ok(AsyncRocksDB {
127 inner: Arc::new(db),
128 })
129 }
130}
131
132#[derive(Clone)]
138pub struct Snapshot {
139 db: Arc<DB>,
140}
141
142impl Snapshot {
143 fn new(db: Arc<DB>) -> Self {
144 Self { db }
145 }
146}
147
148pub struct AsyncRocksDB {
153 inner: Arc<DB>,
154}
155
156impl AsyncRocksDB {
157 pub async fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, AsyncRocksError> {
161 AsyncRocksBuilder::new().open(path).await
162 }
163
164 pub fn snapshot(&self) -> Snapshot {
166 Snapshot::new(self.inner.clone())
167 }
168
169 pub async fn put<K, V>(
173 &self,
174 key: K,
175 value: V,
176 cf: Option<&str>,
177 ) -> Result<(), AsyncRocksError>
178 where
179 K: AsRef<[u8]> + Send + 'static,
180 V: AsRef<[u8]> + Send + 'static,
181 {
182 let db = self.inner.clone();
183 let key = key.as_ref().to_vec();
184 let value = value.as_ref().to_vec();
185 let cf_name = cf.map(|s| s.to_string());
186
187 task::spawn_blocking(move || {
188 let cf_name = cf_name.as_deref().unwrap_or("default");
189 let cf = db.cf_handle(cf_name)
190 .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
191 db.put_cf(cf, key, value)?;
192 Ok(())
193 })
194 .await?
195 }
196
197 pub async fn get<K>(
202 &self,
203 key: K,
204 cf: Option<&str>,
205 snapshot: Option<Snapshot>,
206 ) -> Result<Option<Vec<u8>>, AsyncRocksError>
207 where
208 K: AsRef<[u8]> + Send + 'static,
209 {
210 let db = self.inner.clone();
211 let key = key.as_ref().to_vec();
212 let cf_name = cf.map(|s| s.to_string());
213
214 task::spawn_blocking(move || {
215 let cf_name = cf_name.as_deref().unwrap_or("default");
216 let cf = db.cf_handle(cf_name)
217 .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
218
219 if let Some(snap) = snapshot {
220 let raw_snap = snap.db.snapshot();
221 let mut opts = ReadOptions::default();
222 opts.set_snapshot(&raw_snap);
223 db.get_cf_opt(cf, &key, &opts)
224 } else {
225 db.get_cf(cf, &key)
226 }
227 .map_err(Into::into)
228 })
229 .await?
230 }
231
232 pub async fn delete<K>(
234 &self,
235 key: K,
236 cf: Option<&str>,
237 ) -> Result<(), AsyncRocksError>
238 where
239 K: AsRef<[u8]> + Send + 'static,
240 {
241 let db = self.inner.clone();
242 let key = key.as_ref().to_vec();
243 let cf_name = cf.map(|s| s.to_string());
244
245 task::spawn_blocking(move || {
246 let cf_name = cf_name.as_deref().unwrap_or("default");
247 let cf = db.cf_handle(cf_name)
248 .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
249 db.delete_cf(cf, key)?;
250 Ok(())
251 })
252 .await?
253 }
254
255 pub async fn multi_get<K>(
259 &self,
260 keys: Vec<K>,
261 cf: Option<&str>,
262 snapshot: Option<Snapshot>,
263 ) -> Result<Vec<Option<Vec<u8>>>, AsyncRocksError>
264 where
265 K: AsRef<[u8]> + Send + 'static,
266 {
267 let db = self.inner.clone();
268 let keys: Vec<Vec<u8>> = keys.into_iter().map(|k| k.as_ref().to_vec()).collect();
269 let cf_name = cf.map(|s| s.to_string());
270
271 task::spawn_blocking(move || {
272 let cf_name = cf_name.as_deref().unwrap_or("default");
273 let cf = db.cf_handle(cf_name)
274 .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
275
276 if let Some(snap) = snapshot {
277 let raw_snap = snap.db.snapshot();
278 let mut opts = ReadOptions::default();
279 opts.set_snapshot(&raw_snap);
280 db.multi_get_cf_opt(keys.iter().map(|k| (&cf, k)), &opts)
281 } else {
282 db.multi_get_cf(keys.iter().map(|k| (&cf, k)))
283 }
284 .into_iter()
285 .map(|r| r.map_err(Into::into))
286 .collect()
287 })
288 .await?
289 }
290
291 pub async fn multi_delete<K>(
293 &self,
294 keys: Vec<K>,
295 cf: Option<&str>,
296 ) -> Result<(), AsyncRocksError>
297 where
298 K: AsRef<[u8]> + Send + 'static,
299 {
300 let db = self.inner.clone();
301 let keys: Vec<Vec<u8>> = keys.into_iter().map(|k| k.as_ref().to_vec()).collect();
302 let cf_name = cf.map(|s| s.to_string());
303
304 task::spawn_blocking(move || {
305 let mut batch = rocksdb::WriteBatch::default();
306 let cf_name = cf_name.as_deref().unwrap_or("default");
307 let cf = db.cf_handle(cf_name)
308 .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
309
310 for key in keys {
311 batch.delete_cf(&cf, &key);
312 }
313 db.write(batch)?;
314 Ok(())
315 })
316 .await?
317 }
318
319 pub async fn flush(&self) -> Result<(), AsyncRocksError> {
321 let db = self.inner.clone();
322 task::spawn_blocking(move || db.flush()).await??;
323 Ok(())
324 }
325
326 pub async fn compact_range<K: AsRef<[u8]>>(
328 &self,
329 start: Option<K>,
330 end: Option<K>,
331 cf: Option<&str>,
332 ) -> Result<(), AsyncRocksError> {
333 let db = self.inner.clone();
334 let start = start.map(|k| k.as_ref().to_vec());
335 let end = end.map(|k| k.as_ref().to_vec());
336 let cf_name = cf.map(|s| s.to_string());
337
338 task::spawn_blocking(move || {
339 let cf_name = cf_name.as_deref().unwrap_or("default");
340 let cf = db.cf_handle(cf_name)
341 .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
342 db.compact_range_cf(&cf, start.as_deref(), end.as_deref());
343 Ok(())
344 })
345 .await?
346 }
347
348 pub async fn all(
350 &self,
351 cf: Option<&str>,
352 snapshot: Option<Snapshot>,
353 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, AsyncRocksError> {
354 let db = self.inner.clone();
355 let cf_name = cf.map(|s| s.to_string());
356
357 task::spawn_blocking(move || {
358 let cf_name = cf_name.as_deref().unwrap_or("default");
359 let cf = db.cf_handle(cf_name)
360 .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
361
362 let iter = if let Some(snap) = snapshot {
363 let raw_snap = snap.db.snapshot();
364 let mut opts = ReadOptions::default();
365 opts.set_snapshot(&raw_snap);
366 db.iterator_cf_opt(cf, opts, IteratorMode::Start)
367 } else {
368 db.iterator_cf(cf, IteratorMode::Start)
369 };
370
371 iter.map(|r| r.map(|(k, v)| (k.to_vec(), v.to_vec())))
372 .collect::<Result<Vec<_>, _>>()
373 .map_err(Into::into)
374 })
375 .await?
376 }
377
378 pub async fn prefix_all<P>(
382 &self,
383 prefix: P,
384 cf: Option<&str>,
385 snapshot: Option<Snapshot>,
386 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, AsyncRocksError>
387 where
388 P: AsRef<[u8]> + Send + 'static,
389 {
390 let db = self.inner.clone();
391 let prefix = prefix.as_ref().to_vec();
392 let cf_name = cf.map(|s| s.to_string());
393
394 task::spawn_blocking(move || {
395 let cf_name = cf_name.as_deref().unwrap_or("default");
396 let cf = db.cf_handle(cf_name)
397 .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
398
399 let mut opts = ReadOptions::default();
400 opts.set_prefix_same_as_start(true);
401
402 if let Some(snap) = snapshot {
403 let raw_snap = snap.db.snapshot();
404 opts.set_snapshot(&raw_snap);
405 }
406
407 let iter = db.iterator_cf_opt(
408 cf,
409 opts,
410 IteratorMode::From(&prefix, rocksdb::Direction::Forward),
411 );
412
413 iter.take_while(|r| {
414 r.as_ref()
415 .map(|(k, _)| k.starts_with(&prefix))
416 .unwrap_or(false)
417 })
418 .map(|r| r.map(|(k, v)| (k.to_vec(), v.to_vec())))
419 .collect::<Result<Vec<_>, _>>()
420 .map_err(Into::into)
421 })
422 .await?
423 }
424}