1use rocksdb::{
47 DB, Options, BlockBasedOptions, Cache,
48 ColumnFamilyDescriptor, DBCompressionType, IteratorMode, ReadOptions,
49};
50use std::path::Path;
51use std::sync::Arc;
52use thiserror::Error;
53use tokio::task;
54
55#[derive(Error, Debug)]
57pub enum AsyncRocksError {
58 #[error("RocksDB error: {0}")]
59 Rocks(#[from] rocksdb::Error),
60 #[error("Task failed: {0}")]
61 Join(#[from] task::JoinError),
62 #[error("Column family not found: {0}")]
63 ColumnFamilyNotFound(String),
64}
65
66pub struct ColumnFamilyConfig {
68 name: String,
69 options: Options,
70}
71
72impl ColumnFamilyConfig {
73 pub fn new(name: &str) -> Self {
74 let mut options = Options::default();
75 options.create_if_missing(true);
76 Self {
77 name: name.to_string(),
78 options,
79 }
80 }
81
82 pub fn compression(mut self, compression: DBCompressionType) -> Self {
83 self.options.set_compression_type(compression);
84 self
85 }
86
87 pub fn block_cache_size(mut self, size_bytes: usize) -> Self {
88 let cache = Cache::new_lru_cache(size_bytes);
89 let mut block_opts = BlockBasedOptions::default();
90 block_opts.set_block_cache(&cache);
91 self.options.set_block_based_table_factory(&block_opts);
92 self
93 }
94}
95
96pub struct AsyncRocksBuilder {
98 db_options: Options,
99 column_families: Vec<ColumnFamilyConfig>,
100}
101
102impl Default for AsyncRocksBuilder {
103 fn default() -> Self {
104 let mut db_options = Options::default();
105 db_options.create_if_missing(true);
106 Self {
107 db_options,
108 column_families: vec![],
109 }
110 }
111}
112
113impl AsyncRocksBuilder {
114 pub fn new() -> Self {
115 Self::default()
116 }
117
118 pub fn add_column_family(mut self, name: &str) -> Self {
119 self.column_families.push(ColumnFamilyConfig::new(name));
120 self
121 }
122
123 pub async fn open<P: AsRef<Path>>(mut self, path: P) -> Result<AsyncRocksDB, AsyncRocksError> {
124 let path = path.as_ref().to_path_buf();
125
126 self.db_options.create_if_missing(true);
127
128 if !self.column_families.is_empty() {
129 self.db_options.create_missing_column_families(true);
130 }
131
132 let db = if self.column_families.is_empty() {
133 task::spawn_blocking(move || DB::open(&self.db_options, &path)).await??
134 } else {
135 let cf_descriptors: Vec<_> = self.column_families
136 .into_iter()
137 .map(|cf| ColumnFamilyDescriptor::new(cf.name, cf.options))
138 .collect();
139
140 task::spawn_blocking(move || {
141 DB::open_cf_descriptors(&self.db_options, &path, cf_descriptors)
142 })
143 .await??
144 };
145
146 Ok(AsyncRocksDB {
147 inner: Arc::new(db),
148 })
149 }
150}
151
152#[derive(Clone)]
154pub struct Snapshot {
155 db: Arc<DB>,
156}
157
158impl Snapshot {
159 fn new(db: Arc<DB>) -> Self {
160 Self { db }
161 }
162}
163
164pub struct AsyncRocksDB {
166 inner: Arc<DB>,
167}
168
169impl AsyncRocksDB {
170 pub async fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, AsyncRocksError> {
171 AsyncRocksBuilder::new().open(path).await
172 }
173
174 pub fn snapshot(&self) -> Snapshot {
175 Snapshot::new(self.inner.clone())
176 }
177
178 pub async fn put<K, V>(
179 &self,
180 key: K,
181 value: V,
182 cf: Option<&str>,
183 ) -> Result<(), AsyncRocksError>
184 where
185 K: AsRef<[u8]> + Send + 'static,
186 V: AsRef<[u8]> + Send + 'static,
187 {
188 let db = self.inner.clone();
189 let key = key.as_ref().to_vec();
190 let value = value.as_ref().to_vec();
191 let cf_name = cf.map(|s| s.to_string());
192
193 task::spawn_blocking(move || {
194 let cf_name = cf_name.as_deref().unwrap_or("default");
195 let cf = db.cf_handle(cf_name)
196 .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
197 db.put_cf(cf, key, value)?;
198 Ok(())
199 })
200 .await?
201 }
202
203 pub async fn get<K>(
204 &self,
205 key: K,
206 cf: Option<&str>,
207 snapshot: Option<Snapshot>,
208 ) -> Result<Option<Vec<u8>>, AsyncRocksError>
209 where
210 K: AsRef<[u8]> + Send + 'static,
211 {
212 let db = self.inner.clone();
213 let key = key.as_ref().to_vec();
214 let cf_name = cf.map(|s| s.to_string());
215
216 task::spawn_blocking(move || {
217 let cf_name = cf_name.as_deref().unwrap_or("default");
218 let cf = db.cf_handle(cf_name)
219 .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
220
221 if let Some(snap) = snapshot {
222 let raw_snap = snap.db.snapshot();
223 let mut opts = ReadOptions::default();
224 opts.set_snapshot(&raw_snap);
225 db.get_cf_opt(cf, &key, &opts)
226 } else {
227 db.get_cf(cf, &key)
228 }
229 .map_err(Into::into)
230 })
231 .await?
232 }
233
234 pub async fn delete<K>(
235 &self,
236 key: K,
237 cf: Option<&str>,
238 ) -> Result<(), AsyncRocksError>
239 where
240 K: AsRef<[u8]> + Send + 'static,
241 {
242 let db = self.inner.clone();
243 let key = key.as_ref().to_vec();
244 let cf_name = cf.map(|s| s.to_string());
245
246 task::spawn_blocking(move || {
247 let cf_name = cf_name.as_deref().unwrap_or("default");
248 let cf = db.cf_handle(cf_name)
249 .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
250 db.delete_cf(cf, key)?;
251 Ok(())
252 })
253 .await?
254 }
255
256 pub async fn multi_get<K>(
257 &self,
258 keys: Vec<K>,
259 cf: Option<&str>,
260 snapshot: Option<Snapshot>,
261 ) -> Result<Vec<Option<Vec<u8>>>, AsyncRocksError>
262 where
263 K: AsRef<[u8]> + Send + 'static,
264 {
265 let db = self.inner.clone();
266 let keys: Vec<Vec<u8>> = keys.into_iter().map(|k| k.as_ref().to_vec()).collect();
267 let cf_name = cf.map(|s| s.to_string());
268
269 task::spawn_blocking(move || {
270 let cf_name = cf_name.as_deref().unwrap_or("default");
271 let cf = db.cf_handle(cf_name)
272 .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
273
274 if let Some(snap) = snapshot {
275 let raw_snap = snap.db.snapshot();
276 let mut opts = ReadOptions::default();
277 opts.set_snapshot(&raw_snap);
278 db.multi_get_cf_opt(keys.iter().map(|k| (&cf, k)), &opts)
279 } else {
280 db.multi_get_cf(keys.iter().map(|k| (&cf, k)))
281 }
282 .into_iter()
283 .map(|r| r.map_err(Into::into))
284 .collect()
285 })
286 .await?
287 }
288
289 pub async fn multi_delete<K>(
290 &self,
291 keys: Vec<K>,
292 cf: Option<&str>,
293 ) -> Result<(), AsyncRocksError>
294 where
295 K: AsRef<[u8]> + Send + 'static,
296 {
297 let db = self.inner.clone();
298 let keys: Vec<Vec<u8>> = keys.into_iter().map(|k| k.as_ref().to_vec()).collect();
299 let cf_name = cf.map(|s| s.to_string());
300
301 task::spawn_blocking(move || {
302 let mut batch = rocksdb::WriteBatch::default();
303 let cf_name = cf_name.as_deref().unwrap_or("default");
304 let cf = db.cf_handle(cf_name)
305 .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
306
307 for key in keys {
308 batch.delete_cf(&cf, &key);
309 }
310 db.write(batch)?;
311 Ok(())
312 })
313 .await?
314 }
315
316 pub async fn flush(&self) -> Result<(), AsyncRocksError> {
317 let db = self.inner.clone();
318 task::spawn_blocking(move || db.flush()).await??;
319 Ok(())
320 }
321
322 pub async fn compact_range<K: AsRef<[u8]>>(
323 &self,
324 start: Option<K>,
325 end: Option<K>,
326 cf: Option<&str>,
327 ) -> Result<(), AsyncRocksError> {
328 let db = self.inner.clone();
329 let start = start.map(|k| k.as_ref().to_vec());
330 let end = end.map(|k| k.as_ref().to_vec());
331 let cf_name = cf.map(|s| s.to_string());
332
333 task::spawn_blocking(move || {
334 let cf_name = cf_name.as_deref().unwrap_or("default");
335 let cf = db.cf_handle(cf_name)
336 .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
337 db.compact_range_cf(&cf, start.as_deref(), end.as_deref());
338 Ok(())
339 })
340 .await?
341 }
342
343 pub async fn all(
344 &self,
345 cf: Option<&str>,
346 snapshot: Option<Snapshot>,
347 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, AsyncRocksError> {
348 let db = self.inner.clone();
349 let cf_name = cf.map(|s| s.to_string());
350
351 task::spawn_blocking(move || {
352 let cf_name = cf_name.as_deref().unwrap_or("default");
353 let cf = db.cf_handle(cf_name)
354 .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
355
356 let iter = if let Some(snap) = snapshot {
357 let raw_snap = snap.db.snapshot();
358 let mut opts = ReadOptions::default();
359 opts.set_snapshot(&raw_snap);
360 db.iterator_cf_opt(cf, opts, IteratorMode::Start)
361 } else {
362 db.iterator_cf(cf, IteratorMode::Start)
363 };
364
365 iter.map(|r| r.map(|(k, v)| (k.to_vec(), v.to_vec())))
366 .collect::<Result<Vec<_>, _>>()
367 .map_err(Into::into)
368 })
369 .await?
370 }
371
372 pub async fn prefix_all<P>(
373 &self,
374 prefix: P,
375 cf: Option<&str>,
376 snapshot: Option<Snapshot>,
377 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>, AsyncRocksError>
378 where
379 P: AsRef<[u8]> + Send + 'static,
380 {
381 let db = self.inner.clone();
382 let prefix = prefix.as_ref().to_vec();
383 let cf_name = cf.map(|s| s.to_string());
384
385 task::spawn_blocking(move || {
386 let cf_name = cf_name.as_deref().unwrap_or("default");
387 let cf = db.cf_handle(cf_name)
388 .ok_or(AsyncRocksError::ColumnFamilyNotFound(cf_name.to_string()))?;
389
390 let mut opts = ReadOptions::default();
391 opts.set_prefix_same_as_start(true);
392
393 if let Some(snap) = snapshot {
394 let raw_snap = snap.db.snapshot();
395 opts.set_snapshot(&raw_snap);
396 }
397
398 let iter = db.iterator_cf_opt(
399 cf,
400 opts,
401 IteratorMode::From(&prefix, rocksdb::Direction::Forward),
402 );
403
404 iter.take_while(|r| {
405 r.as_ref()
406 .map(|(k, _)| k.starts_with(&prefix))
407 .unwrap_or(false)
408 })
409 .map(|r| r.map(|(k, v)| (k.to_vec(), v.to_vec())))
410 .collect::<Result<Vec<_>, _>>()
411 .map_err(Into::into)
412 })
413 .await?
414 }
415}