1use crate::ser::{self, DeserializationMode, ProtocolVersion};
18use lmdb_zero as lmdb;
19use lmdb_zero::traits::CreateCursor;
20use lmdb_zero::LmdbResultExt;
21use parking_lot::RwLock;
22use std::fs;
23use std::sync::Arc;
24
25pub const ALLOC_CHUNK_SIZE_DEFAULT: usize = 134_217_728; pub const ALLOC_CHUNK_SIZE_DEFAULT_TEST: usize = 1_048_576; const RESIZE_PERCENT: f32 = 0.9;
30const RESIZE_MIN_TARGET_PERCENT: f32 = 0.65;
33
34#[derive(Clone, Eq, PartialEq, Debug, thiserror::Error)]
36pub enum Error {
37 #[error("DB Not Found Error: {0}")]
39 NotFoundErr(String),
40 #[error("LMDB error: {0}")]
42 LmdbErr(lmdb::error::Error),
43 #[error("Serialization Error: {0}")]
45 SerErr(ser::Error),
46 #[error("File handling Error: {0}")]
48 FileErr(String),
49 #[error("Other Error: {0}")]
51 OtherErr(String),
52}
53
54impl From<lmdb::error::Error> for Error {
55 fn from(e: lmdb::error::Error) -> Error {
56 Error::LmdbErr(e)
57 }
58}
59
60impl From<ser::Error> for Error {
61 fn from(e: ser::Error) -> Error {
62 Error::SerErr(e)
63 }
64}
65
66pub fn option_to_not_found<T, F>(res: Result<Option<T>, Error>, field_name: F) -> Result<T, Error>
68where
69 F: Fn() -> String,
70{
71 match res {
72 Ok(None) => Err(Error::NotFoundErr(field_name())),
73 Ok(Some(o)) => Ok(o),
74 Err(e) => Err(e),
75 }
76}
77
78const DEFAULT_DB_VERSION: ProtocolVersion = ProtocolVersion(3);
79
80pub struct Store {
83 env: Arc<lmdb::Environment>,
84 db: Arc<RwLock<Option<Arc<lmdb::Database<'static>>>>>,
85 name: String,
86 version: ProtocolVersion,
87 alloc_chunk_size: usize,
88}
89
90impl Store {
91 pub fn new(
96 root_path: &str,
97 env_name: Option<&str>,
98 db_name: Option<&str>,
99 max_readers: Option<u32>,
100 ) -> Result<Store, Error> {
101 let name = match env_name {
102 Some(n) => n.to_owned(),
103 None => "lmdb".to_owned(),
104 };
105 let db_name = match db_name {
106 Some(n) => n.to_owned(),
107 None => "lmdb".to_owned(),
108 };
109 let full_path = [root_path.to_owned(), name].join("/");
110 fs::create_dir_all(&full_path).map_err(|e| {
111 Error::FileErr(format!(
112 "Unable to create directory 'db_root' to store chain_data: {:?}",
113 e
114 ))
115 })?;
116
117 let mut env_builder = lmdb::EnvBuilder::new()?;
118 env_builder.set_maxdbs(8)?;
119
120 if let Some(max_readers) = max_readers {
121 env_builder.set_maxreaders(max_readers)?;
122 }
123
124 let alloc_chunk_size = ALLOC_CHUNK_SIZE_DEFAULT_TEST;
125
126 let env = unsafe { env_builder.open(&full_path, lmdb::open::NOTLS, 0o600)? };
127
128 debug!("DB Mapsize for {} is {}", full_path, env.info()?.mapsize);
129 let res = Store {
130 env: Arc::new(env),
131 db: Arc::new(RwLock::new(None)),
132 name: db_name,
133 version: DEFAULT_DB_VERSION,
134 alloc_chunk_size,
135 };
136
137 {
138 let mut w = res.db.write();
139 *w = Some(Arc::new(lmdb::Database::open(
140 res.env.clone(),
141 Some(&res.name),
142 &lmdb::DatabaseOptions::new(lmdb::db::CREATE),
143 )?));
144 }
145 Ok(res)
146 }
147
148 pub fn with_version(&self, version: ProtocolVersion) -> Store {
151 let alloc_chunk_size = ALLOC_CHUNK_SIZE_DEFAULT_TEST;
152 Store {
153 env: self.env.clone(),
154 db: self.db.clone(),
155 name: self.name.clone(),
156 version,
157 alloc_chunk_size,
158 }
159 }
160
161 pub fn protocol_version(&self) -> ProtocolVersion {
163 self.version
164 }
165
166 pub fn open(&self) -> Result<(), Error> {
168 let mut w = self.db.write();
169 *w = Some(Arc::new(lmdb::Database::open(
170 self.env.clone(),
171 Some(&self.name),
172 &lmdb::DatabaseOptions::new(lmdb::db::CREATE),
173 )?));
174 Ok(())
175 }
176
177 pub fn needs_resize(&self) -> Result<bool, Error> {
179 let env_info = self.env.info()?;
180 let stat = self.env.stat()?;
181
182 let size_used = stat.psize as usize * env_info.last_pgno;
183 trace!("DB map size: {}", env_info.mapsize);
184 trace!("Space used: {}", size_used);
185 trace!("Space remaining: {}", env_info.mapsize - size_used);
186 let resize_percent = RESIZE_PERCENT;
187 trace!(
188 "Percent used: {:.*} Percent threshold: {:.*}",
189 4,
190 size_used as f64 / env_info.mapsize as f64,
191 4,
192 resize_percent
193 );
194
195 if size_used as f32 / env_info.mapsize as f32 > resize_percent
196 || env_info.mapsize < self.alloc_chunk_size
197 {
198 trace!("Resize threshold met (percent-based)");
199 Ok(true)
200 } else {
201 trace!("Resize threshold not met (percent-based)");
202 Ok(false)
203 }
204 }
205
206 pub fn do_resize(&self) -> Result<(), Error> {
209 let env_info = self.env.info()?;
210 let stat = self.env.stat()?;
211 let size_used = stat.psize as usize * env_info.last_pgno;
212
213 let new_mapsize = if env_info.mapsize < self.alloc_chunk_size {
214 self.alloc_chunk_size
215 } else {
216 let mut tot = env_info.mapsize;
217 while size_used as f32 / tot as f32 > RESIZE_MIN_TARGET_PERCENT {
218 tot += self.alloc_chunk_size;
219 }
220 tot
221 };
222
223 let mut w = self.db.write();
225 *w = None;
226
227 unsafe {
228 self.env.set_mapsize(new_mapsize)?;
229 }
230
231 *w = Some(Arc::new(lmdb::Database::open(
232 self.env.clone(),
233 Some(&self.name),
234 &lmdb::DatabaseOptions::new(lmdb::db::CREATE),
235 )?));
236
237 info!(
238 "Resized database from {} to {}",
239 env_info.mapsize, new_mapsize
240 );
241 Ok(())
242 }
243
244 pub fn get_with<F, T>(
247 &self,
248 key: &[u8],
249 access: &lmdb::ConstAccessor<'_>,
250 db: &lmdb::Database<'_>,
251 deserialize: F,
252 ) -> Result<Option<T>, Error>
253 where
254 F: Fn(&[u8], &[u8]) -> Result<T, Error>,
255 {
256 let res: Option<&[u8]> = access.get(db, key).to_opt()?;
257 match res {
258 None => Ok(None),
259 Some(res) => deserialize(key, res).map(Some),
260 }
261 }
262
263 pub fn get_ser<T: ser::Readable>(
266 &self,
267 key: &[u8],
268 deser_mode: Option<DeserializationMode>,
269 ) -> Result<Option<T>, Error> {
270 let lock = self.db.read();
271 let db = lock
272 .as_ref()
273 .ok_or_else(|| Error::NotFoundErr("chain db is None".to_string()))?;
274 let txn = lmdb::ReadTransaction::new(self.env.clone())?;
275 let access = txn.access();
276 let d = match deser_mode {
277 Some(d) => d,
278 _ => DeserializationMode::default(),
279 };
280 self.get_with(key, &access, &db, |_, mut data| {
281 ser::deserialize(&mut data, self.protocol_version(), d).map_err(From::from)
282 })
283 }
284
285 pub fn exists(&self, key: &[u8]) -> Result<bool, Error> {
287 let lock = self.db.read();
288 let db = lock
289 .as_ref()
290 .ok_or_else(|| Error::NotFoundErr("chain db is None".to_string()))?;
291 let txn = lmdb::ReadTransaction::new(self.env.clone())?;
292 let access = txn.access();
293
294 let res: Option<&lmdb::Ignore> = access.get(db, key).to_opt()?;
295 Ok(res.is_some())
296 }
297
298 pub fn iter<F, T>(&self, prefix: &[u8], deserialize: F) -> Result<PrefixIterator<F, T>, Error>
300 where
301 F: Fn(&[u8], &[u8]) -> Result<T, Error>,
302 {
303 let lock = self.db.read();
304 let db = lock
305 .as_ref()
306 .ok_or_else(|| Error::NotFoundErr("chain db is None".to_string()))?;
307 let tx = Arc::new(lmdb::ReadTransaction::new(self.env.clone())?);
308 let cursor = Arc::new(tx.cursor(db.clone())?);
309 Ok(PrefixIterator::new(tx, cursor, prefix, deserialize))
310 }
311
312 pub fn batch(&self) -> Result<Batch<'_>, Error> {
314 if self.needs_resize()? {
316 self.do_resize()?;
317 }
318 let tx = lmdb::WriteTransaction::new(self.env.clone())?;
319 Ok(Batch { store: self, tx })
320 }
321}
322
323pub struct Batch<'a> {
325 store: &'a Store,
326 tx: lmdb::WriteTransaction<'a>,
327}
328
329impl<'a> Batch<'a> {
330 pub fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> {
332 let lock = self.store.db.read();
333 let db = lock
334 .as_ref()
335 .ok_or_else(|| Error::NotFoundErr("chain db is None".to_string()))?;
336 self.tx
337 .access()
338 .put(db, key, value, lmdb::put::Flags::empty())?;
339 Ok(())
340 }
341
342 pub fn put_ser<W: ser::Writeable>(&self, key: &[u8], value: &W) -> Result<(), Error> {
345 self.put_ser_with_version(key, value, self.store.protocol_version())
346 }
347
348 pub fn protocol_version(&self) -> ProtocolVersion {
350 self.store.protocol_version()
351 }
352
353 pub fn put_ser_with_version<W: ser::Writeable>(
356 &self,
357 key: &[u8],
358 value: &W,
359 version: ProtocolVersion,
360 ) -> Result<(), Error> {
361 let ser_value = ser::ser_vec(value, version);
362 match ser_value {
363 Ok(data) => self.put(key, &data),
364 Err(err) => Err(err.into()),
365 }
366 }
367
368 pub fn get_with<F, T>(&self, key: &[u8], deserialize: F) -> Result<Option<T>, Error>
371 where
372 F: Fn(&[u8], &[u8]) -> Result<T, Error>,
373 {
374 let access = self.tx.access();
375 let lock = self.store.db.read();
376 let db = lock
377 .as_ref()
378 .ok_or_else(|| Error::NotFoundErr("chain db is None".to_string()))?;
379
380 self.store.get_with(key, &access, &db, deserialize)
381 }
382
383 pub fn exists(&self, key: &[u8]) -> Result<bool, Error> {
386 let access = self.tx.access();
387 let lock = self.store.db.read();
388 let db = lock
389 .as_ref()
390 .ok_or_else(|| Error::NotFoundErr("chain db is None".to_string()))?;
391 let res: Option<&lmdb::Ignore> = access.get(db, key).to_opt()?;
392 Ok(res.is_some())
393 }
394
395 pub fn iter<F, T>(&self, prefix: &[u8], deserialize: F) -> Result<PrefixIterator<F, T>, Error>
397 where
398 F: Fn(&[u8], &[u8]) -> Result<T, Error>,
399 {
400 self.store.iter(prefix, deserialize)
401 }
402
403 pub fn get_ser<T: ser::Readable>(
405 &self,
406 key: &[u8],
407 deser_mode: Option<DeserializationMode>,
408 ) -> Result<Option<T>, Error> {
409 let d = match deser_mode {
410 Some(d) => d,
411 _ => DeserializationMode::default(),
412 };
413 self.get_with(key, |_, mut data| {
414 match ser::deserialize(&mut data, self.protocol_version(), d) {
415 Ok(res) => Ok(res),
416 Err(e) => Err(From::from(e)),
417 }
418 })
419 }
420
421 pub fn delete(&self, key: &[u8]) -> Result<(), Error> {
423 let lock = self.store.db.read();
424 let db = lock
425 .as_ref()
426 .ok_or_else(|| Error::NotFoundErr("chain db is None".to_string()))?;
427 self.tx.access().del_key(db, key)?;
428 Ok(())
429 }
430
431 pub fn commit(self) -> Result<(), Error> {
433 self.tx.commit()?;
434 Ok(())
435 }
436
437 pub fn child(&mut self) -> Result<Batch<'_>, Error> {
440 Ok(Batch {
441 store: self.store,
442 tx: self.tx.child_tx()?,
443 })
444 }
445}
446
447pub struct PrefixIterator<F, T>
450where
451 F: Fn(&[u8], &[u8]) -> Result<T, Error>,
452{
453 tx: Arc<lmdb::ReadTransaction<'static>>,
454 cursor: Arc<lmdb::Cursor<'static, 'static>>,
455 seek: bool,
456 prefix: Vec<u8>,
457 deserialize: F,
458}
459
460impl<F, T> Iterator for PrefixIterator<F, T>
461where
462 F: Fn(&[u8], &[u8]) -> Result<T, Error>,
463{
464 type Item = T;
465
466 fn next(&mut self) -> Option<Self::Item> {
467 let access = self.tx.access();
468 let cursor = Arc::get_mut(&mut self.cursor).expect("failed to get cursor");
469 let kv: Result<(&[u8], &[u8]), _> = if self.seek {
470 cursor.next(&access)
471 } else {
472 self.seek = true;
473 cursor.seek_range_k(&access, &self.prefix[..])
474 };
475 kv.ok()
476 .filter(|(k, _)| k.starts_with(self.prefix.as_slice()))
477 .map(|(k, v)| match (self.deserialize)(k, v) {
478 Ok(v) => Some(v),
479 Err(_) => None,
480 })
481 .flatten()
482 }
483}
484
485impl<F, T> PrefixIterator<F, T>
486where
487 F: Fn(&[u8], &[u8]) -> Result<T, Error>,
488{
489 pub fn new(
491 tx: Arc<lmdb::ReadTransaction<'static>>,
492 cursor: Arc<lmdb::Cursor<'static, 'static>>,
493 prefix: &[u8],
494 deserialize: F,
495 ) -> PrefixIterator<F, T> {
496 PrefixIterator {
497 tx,
498 cursor,
499 seek: false,
500 prefix: prefix.to_vec(),
501 deserialize,
502 }
503 }
504}