pulse_state/
rocks.rs

1#![cfg(feature = "rocksdb")]
2use async_trait::async_trait;
3use pulse_core::{KvState, Result, SnapshotId};
4
5/// A RocksDB-backed `KvState` (enable with `--features rocksdb`).
6pub struct RocksDbState {
7    db: rocksdb::DB,
8}
9
10impl RocksDbState {
11    pub fn open(path: &str) -> Result<Self> {
12        let db = rocksdb::DB::open_default(path)?;
13        Ok(Self { db })
14    }
15}
16
17#[async_trait]
18impl KvState for RocksDbState {
19    async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
20        Ok(self.db.get(key)?)
21    }
22    async fn put(&self, key: &[u8], value: Vec<u8>) -> Result<()> {
23        self.db.put(key, value)?;
24        Ok(())
25    }
26    async fn delete(&self, key: &[u8]) -> Result<()> {
27        self.db.delete(key)?;
28        Ok(())
29    }
30    async fn iter_prefix(&self, prefix: Option<&[u8]>) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
31        use rocksdb::DBIterator;
32        let mut out = Vec::new();
33        match prefix {
34            Some(p) => {
35                let mode = rocksdb::IteratorMode::From(p, rocksdb::Direction::Forward);
36                let iter: DBIterator<'_> = self.db.iterator(mode);
37                for item in iter {
38                    let (k, v) = item?;
39                    if k.starts_with(p) {
40                        out.push((k.to_vec(), v.to_vec()));
41                    } else {
42                        break;
43                    }
44                }
45            }
46            None => {
47                for item in self.db.iterator(rocksdb::IteratorMode::Start) {
48                    let (k, v) = item?;
49                    out.push((k.to_vec(), v.to_vec()));
50                }
51            }
52        }
53        Ok(out)
54    }
55    async fn snapshot(&self) -> Result<SnapshotId> {
56        // For simplicity: create a checkpoint directory name and return it; caller responsible to persist mapping
57        // Real-world would use rocksdb::checkpoint::Checkpoint
58        use std::time::{SystemTime, UNIX_EPOCH};
59        let ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis();
60        let id = format!("rocks-{}", ts);
61        let cp = rocksdb::checkpoint::Checkpoint::new(&self.db)?;
62        let dir = format!("_cp_{}", &id);
63        cp.create_checkpoint(&dir)?;
64        Ok(id)
65    }
66    async fn restore(&self, _snapshot: SnapshotId) -> Result<()> {
67        // Not supported in-place for an open DB; in practice, one would reopen from checkpoint dir.
68        // We no-op here and document the limitation for this demo.
69        Ok(())
70    }
71}