1#![cfg(feature = "rocksdb")]
2use async_trait::async_trait;
3use pulse_core::{KvState, Result, SnapshotId};
4
5pub struct RocksDbState {
7 db: rocksdb::DB,
8}
9
10impl RocksDbState {
11 pub fn open(path: &str) -> Result<Self> {
12 let db = rocksdb::DB::open_default(path)?;
13 Ok(Self { db })
14 }
15}
16
17#[async_trait]
18impl KvState for RocksDbState {
19 async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
20 Ok(self.db.get(key)?)
21 }
22 async fn put(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
23 self.db.put(key, value)?;
24 Ok(())
25 }
26 async fn delete(&self, key: &[u8]) -> Result<()> {
27 self.db.delete(key)?;
28 Ok(())
29 }
30 async fn iter_prefix(&self, prefix: Option<&[u8]>) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
31 use rocksdb::DBIterator;
32 let mut out = Vec::new();
33 match prefix {
34 Some(p) => {
35 let mode = rocksdb::IteratorMode::From(p, rocksdb::Direction::Forward);
36 let iter: DBIterator<'_> = self.db.iterator(mode);
37 for item in iter {
38 let (k, v) = item?;
39 if k.starts_with(p) {
40 out.push((k.to_vec(), v.to_vec()));
41 } else {
42 break;
43 }
44 }
45 }
46 None => {
47 for item in self.db.iterator(rocksdb::IteratorMode::Start) {
48 let (k, v) = item?;
49 out.push((k.to_vec(), v.to_vec()));
50 }
51 }
52 }
53 Ok(out)
54 }
55 async fn snapshot(&self) -> Result<SnapshotId> {
56 use std::time::{SystemTime, UNIX_EPOCH};
59 let ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis();
60 let id = format!("rocks-{}", ts);
61 let cp = rocksdb::checkpoint::Checkpoint::new(&self.db)?;
62 let dir = format!("_cp_{}", &id);
63 cp.create_checkpoint(&dir)?;
64 Ok(id)
65 }
66 async fn restore(&self, _snapshot: SnapshotId) -> Result<()> {
67 Ok(())
70 }
71}