calimero_store_rocksdb/
lib.rs1#[cfg(test)]
52mod tests;
53
54use calimero_store::config::StoreConfig;
55use calimero_store::db::{Column, Database};
56use calimero_store::iter::{DBIter, Iter};
57use calimero_store::slice::Slice;
58use calimero_store::tx::{Operation, Transaction};
59use eyre::{bail, Result as EyreResult};
60use rocksdb::{
61 ColumnFamily, DBRawIteratorWithThreadMode, Options, ReadOptions, Snapshot, WriteBatch, DB,
62};
63use strum::IntoEnumIterator;
64
65const DEFAULT_MAX_OPEN_FILES: i32 = 256;
70
71const DEFAULT_BLOCK_CACHE_SIZE: usize = 128 * 1024 * 1024;
76
77#[derive(Debug)]
99pub struct RocksDB {
100 db: DB,
101}
102
103impl RocksDB {
104 fn cf_handle(&self, column: Column) -> Option<&ColumnFamily> {
105 self.db.cf_handle(column.as_ref())
106 }
107
108 fn try_cf_handle(&self, column: Column) -> EyreResult<&ColumnFamily> {
109 let Some(cf_handle) = self.cf_handle(column) else {
110 bail!("unknown column family: {:?}", column);
111 };
112
113 Ok(cf_handle)
114 }
115}
116
117impl Database<'_> for RocksDB {
118 fn open(config: &StoreConfig) -> EyreResult<Self> {
119 let mut options = Options::default();
120
121 options.create_if_missing(true);
122 options.create_missing_column_families(true);
123
124 options.set_max_open_files(DEFAULT_MAX_OPEN_FILES);
128
129 let cache = rocksdb::Cache::new_lru_cache(DEFAULT_BLOCK_CACHE_SIZE);
132 let mut block_opts = rocksdb::BlockBasedOptions::default();
133 block_opts.set_block_cache(&cache);
134 options.set_block_based_table_factory(&block_opts);
135
136 Ok(Self {
137 db: DB::open_cf(&options, &config.path, Column::iter())?,
138 })
139 }
140
141 fn has(&self, col: Column, key: Slice<'_>) -> EyreResult<bool> {
142 let cf_handle = self.try_cf_handle(col)?;
143
144 let exists = self.db.key_may_exist_cf(cf_handle, key.as_ref())
145 && self.get(col, key).map(|value| value.is_some())?;
146
147 Ok(exists)
148 }
149
150 fn get(&self, col: Column, key: Slice<'_>) -> EyreResult<Option<Slice<'_>>> {
151 let cf_handle = self.try_cf_handle(col)?;
152
153 let value = self.db.get_pinned_cf(cf_handle, key.as_ref())?;
154
155 Ok(value.map(Slice::from_owned))
156 }
157
158 fn put(&self, col: Column, key: Slice<'_>, value: Slice<'_>) -> EyreResult<()> {
159 let cf_handle = self.try_cf_handle(col)?;
160
161 self.db.put_cf(cf_handle, key.as_ref(), value.as_ref())?;
162
163 Ok(())
164 }
165
166 fn delete(&self, col: Column, key: Slice<'_>) -> EyreResult<()> {
167 let cf_handle = self.try_cf_handle(col)?;
168
169 self.db.delete_cf(cf_handle, key.as_ref())?;
170
171 Ok(())
172 }
173
174 fn iter(&self, col: Column) -> EyreResult<Iter<'_>> {
175 let cf_handle = self.try_cf_handle(col)?;
176
177 let mut iter = self.db.raw_iterator_cf(cf_handle);
178
179 iter.seek_to_first();
180
181 Ok(Iter::new(DBIterator { ready: true, iter }))
182 }
183
184 fn apply(&self, tx: &Transaction<'_>) -> EyreResult<()> {
185 let mut batch = WriteBatch::default();
186
187 let mut unknown_cfs = vec![];
188
189 for (entry, op) in tx.iter() {
190 let (col, key) = (entry.column(), entry.key());
191
192 let Some(cf) = self.cf_handle(col) else {
193 unknown_cfs.push(col);
194 continue;
195 };
196 match op {
197 Operation::Put { value } => batch.put_cf(cf, key, value),
198 Operation::Delete => batch.delete_cf(cf, key),
199 }
200 }
201
202 if !unknown_cfs.is_empty() {
203 bail!("unknown column families: {:?}", unknown_cfs);
204 }
205
206 self.db.write(batch)?;
207
208 Ok(())
209 }
210
211 fn iter_snapshot(&self, col: Column) -> EyreResult<Iter<'_>> {
212 let cf_handle = self.try_cf_handle(col)?;
213 let snapshot = self.db.snapshot();
214
215 let mut read_opts = ReadOptions::default();
217 read_opts.set_snapshot(&snapshot);
218
219 let mut iter = self.db.raw_iterator_cf_opt(cf_handle, read_opts);
221 iter.seek_to_first();
222
223 Ok(Iter::new(SnapshotIterator {
224 ready: true,
225 iter,
226 _snapshot: snapshot,
227 }))
228 }
229}
230
231struct DBIterator<'a> {
232 ready: bool,
233 iter: DBRawIteratorWithThreadMode<'a, DB>,
234}
235
236struct SnapshotIterator<'a> {
241 ready: bool,
242 iter: DBRawIteratorWithThreadMode<'a, DB>,
247 _snapshot: Snapshot<'a>,
250}
251
252impl DBIter for DBIterator<'_> {
253 fn seek(&mut self, key: Slice<'_>) -> EyreResult<Option<Slice<'_>>> {
254 self.iter.seek(key);
255
256 self.ready = false;
257
258 Ok(self.iter.key().map(Into::into))
259 }
260
261 fn next(&mut self) -> EyreResult<Option<Slice<'_>>> {
262 if self.ready {
263 self.ready = false;
264 } else {
265 self.iter.next();
266 }
267
268 Ok(self.iter.key().map(Into::into))
269 }
270
271 fn read(&self) -> EyreResult<Slice<'_>> {
272 let Some(value) = self.iter.value() else {
273 bail!("missing value for iterator entry {:?}", self.iter.key());
274 };
275
276 Ok(value.into())
277 }
278}
279
280impl DBIter for SnapshotIterator<'_> {
281 fn seek(&mut self, key: Slice<'_>) -> EyreResult<Option<Slice<'_>>> {
282 self.iter.seek(key);
283
284 self.ready = false;
285
286 Ok(self.iter.key().map(Into::into))
287 }
288
289 fn next(&mut self) -> EyreResult<Option<Slice<'_>>> {
290 if self.ready {
291 self.ready = false;
292 } else {
293 self.iter.next();
294 }
295
296 Ok(self.iter.key().map(Into::into))
297 }
298
299 fn read(&self) -> EyreResult<Slice<'_>> {
300 let Some(value) = self.iter.value() else {
301 bail!("missing value for iterator entry {:?}", self.iter.key());
302 };
303
304 Ok(value.into())
305 }
306}