cloud_mmr/storage/
lmdb.rs

1// Copyright 2021 The Grin Developers
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Storage of core types using LMDB.
16
17use crate::ser::{self, DeserializationMode, ProtocolVersion};
18use lmdb_zero as lmdb;
19use lmdb_zero::traits::CreateCursor;
20use lmdb_zero::LmdbResultExt;
21use parking_lot::RwLock;
22use std::fs;
23use std::sync::Arc;
24
25/// number of bytes to grow the database by when needed
26pub const ALLOC_CHUNK_SIZE_DEFAULT: usize = 134_217_728; //128 MB
27/// And for test mode, to avoid too much disk allocation on windows
28pub const ALLOC_CHUNK_SIZE_DEFAULT_TEST: usize = 1_048_576; //1 MB
29const RESIZE_PERCENT: f32 = 0.9;
30/// Want to ensure that each resize gives us at least this %
31/// of total space free
32const RESIZE_MIN_TARGET_PERCENT: f32 = 0.65;
33
34/// Main error type for this lmdb
35#[derive(Clone, Eq, PartialEq, Debug, thiserror::Error)]
36pub enum Error {
37    /// Couldn't find what we were looking for
38    #[error("DB Not Found Error: {0}")]
39    NotFoundErr(String),
40    /// Wraps an error originating from LMDB
41    #[error("LMDB error: {0}")]
42    LmdbErr(lmdb::error::Error),
43    /// Wraps a serialization error for Writeable or Readable
44    #[error("Serialization Error: {0}")]
45    SerErr(ser::Error),
46    /// File handling error
47    #[error("File handling Error: {0}")]
48    FileErr(String),
49    /// Other error
50    #[error("Other Error: {0}")]
51    OtherErr(String),
52}
53
54impl From<lmdb::error::Error> for Error {
55    fn from(e: lmdb::error::Error) -> Error {
56        Error::LmdbErr(e)
57    }
58}
59
60impl From<ser::Error> for Error {
61    fn from(e: ser::Error) -> Error {
62        Error::SerErr(e)
63    }
64}
65
66/// unwraps the inner option by converting the none case to a not found error
67pub fn option_to_not_found<T, F>(res: Result<Option<T>, Error>, field_name: F) -> Result<T, Error>
68where
69    F: Fn() -> String,
70{
71    match res {
72        Ok(None) => Err(Error::NotFoundErr(field_name())),
73        Ok(Some(o)) => Ok(o),
74        Err(e) => Err(e),
75    }
76}
77
78const DEFAULT_DB_VERSION: ProtocolVersion = ProtocolVersion(3);
79
80/// LMDB-backed store facilitating data access and serialization. All writes
81/// are done through a Batch abstraction providing atomicity.
82pub struct Store {
83    env: Arc<lmdb::Environment>,
84    db: Arc<RwLock<Option<Arc<lmdb::Database<'static>>>>>,
85    name: String,
86    version: ProtocolVersion,
87    alloc_chunk_size: usize,
88}
89
90impl Store {
91    /// Create a new LMDB env under the provided directory.
92    /// By default creates an environment named "lmdb".
93    /// Be aware of transactional semantics in lmdb
94    /// (transactions are per environment, not per database).
95    pub fn new(
96        root_path: &str,
97        env_name: Option<&str>,
98        db_name: Option<&str>,
99        max_readers: Option<u32>,
100    ) -> Result<Store, Error> {
101        let name = match env_name {
102            Some(n) => n.to_owned(),
103            None => "lmdb".to_owned(),
104        };
105        let db_name = match db_name {
106            Some(n) => n.to_owned(),
107            None => "lmdb".to_owned(),
108        };
109        let full_path = [root_path.to_owned(), name].join("/");
110        fs::create_dir_all(&full_path).map_err(|e| {
111            Error::FileErr(format!(
112                "Unable to create directory 'db_root' to store chain_data: {:?}",
113                e
114            ))
115        })?;
116
117        let mut env_builder = lmdb::EnvBuilder::new()?;
118        env_builder.set_maxdbs(8)?;
119
120        if let Some(max_readers) = max_readers {
121            env_builder.set_maxreaders(max_readers)?;
122        }
123
124        let alloc_chunk_size = ALLOC_CHUNK_SIZE_DEFAULT_TEST;
125
126        let env = unsafe { env_builder.open(&full_path, lmdb::open::NOTLS, 0o600)? };
127
128        debug!("DB Mapsize for {} is {}", full_path, env.info()?.mapsize);
129        let res = Store {
130            env: Arc::new(env),
131            db: Arc::new(RwLock::new(None)),
132            name: db_name,
133            version: DEFAULT_DB_VERSION,
134            alloc_chunk_size,
135        };
136
137        {
138            let mut w = res.db.write();
139            *w = Some(Arc::new(lmdb::Database::open(
140                res.env.clone(),
141                Some(&res.name),
142                &lmdb::DatabaseOptions::new(lmdb::db::CREATE),
143            )?));
144        }
145        Ok(res)
146    }
147
148    /// Construct a new store using a specific protocol version.
149    /// Permits access to the db with legacy protocol versions for db migrations.
150    pub fn with_version(&self, version: ProtocolVersion) -> Store {
151        let alloc_chunk_size = ALLOC_CHUNK_SIZE_DEFAULT_TEST;
152        Store {
153            env: self.env.clone(),
154            db: self.db.clone(),
155            name: self.name.clone(),
156            version,
157            alloc_chunk_size,
158        }
159    }
160
161    /// Protocol version for the store.
162    pub fn protocol_version(&self) -> ProtocolVersion {
163        self.version
164    }
165
166    /// Opens the database environment
167    pub fn open(&self) -> Result<(), Error> {
168        let mut w = self.db.write();
169        *w = Some(Arc::new(lmdb::Database::open(
170            self.env.clone(),
171            Some(&self.name),
172            &lmdb::DatabaseOptions::new(lmdb::db::CREATE),
173        )?));
174        Ok(())
175    }
176
177    /// Determines whether the environment needs a resize based on a simple percentage threshold
178    pub fn needs_resize(&self) -> Result<bool, Error> {
179        let env_info = self.env.info()?;
180        let stat = self.env.stat()?;
181
182        let size_used = stat.psize as usize * env_info.last_pgno;
183        trace!("DB map size: {}", env_info.mapsize);
184        trace!("Space used: {}", size_used);
185        trace!("Space remaining: {}", env_info.mapsize - size_used);
186        let resize_percent = RESIZE_PERCENT;
187        trace!(
188            "Percent used: {:.*}  Percent threshold: {:.*}",
189            4,
190            size_used as f64 / env_info.mapsize as f64,
191            4,
192            resize_percent
193        );
194
195        if size_used as f32 / env_info.mapsize as f32 > resize_percent
196            || env_info.mapsize < self.alloc_chunk_size
197        {
198            trace!("Resize threshold met (percent-based)");
199            Ok(true)
200        } else {
201            trace!("Resize threshold not met (percent-based)");
202            Ok(false)
203        }
204    }
205
206    /// Increments the database size by as many ALLOC_CHUNK_SIZES
207    /// to give a minimum threshold of free space
208    pub fn do_resize(&self) -> Result<(), Error> {
209        let env_info = self.env.info()?;
210        let stat = self.env.stat()?;
211        let size_used = stat.psize as usize * env_info.last_pgno;
212
213        let new_mapsize = if env_info.mapsize < self.alloc_chunk_size {
214            self.alloc_chunk_size
215        } else {
216            let mut tot = env_info.mapsize;
217            while size_used as f32 / tot as f32 > RESIZE_MIN_TARGET_PERCENT {
218                tot += self.alloc_chunk_size;
219            }
220            tot
221        };
222
223        // close
224        let mut w = self.db.write();
225        *w = None;
226
227        unsafe {
228            self.env.set_mapsize(new_mapsize)?;
229        }
230
231        *w = Some(Arc::new(lmdb::Database::open(
232            self.env.clone(),
233            Some(&self.name),
234            &lmdb::DatabaseOptions::new(lmdb::db::CREATE),
235        )?));
236
237        info!(
238            "Resized database from {} to {}",
239            env_info.mapsize, new_mapsize
240        );
241        Ok(())
242    }
243
244    /// Gets a value from the db, provided its key.
245    /// Deserializes the retrieved data using the provided function.
246    pub fn get_with<F, T>(
247        &self,
248        key: &[u8],
249        access: &lmdb::ConstAccessor<'_>,
250        db: &lmdb::Database<'_>,
251        deserialize: F,
252    ) -> Result<Option<T>, Error>
253    where
254        F: Fn(&[u8], &[u8]) -> Result<T, Error>,
255    {
256        let res: Option<&[u8]> = access.get(db, key).to_opt()?;
257        match res {
258            None => Ok(None),
259            Some(res) => deserialize(key, res).map(Some),
260        }
261    }
262
263    /// Gets a `Readable` value from the db, provided its key.
264    /// Note: Creates a new read transaction so will *not* see any uncommitted data.
265    pub fn get_ser<T: ser::Readable>(
266        &self,
267        key: &[u8],
268        deser_mode: Option<DeserializationMode>,
269    ) -> Result<Option<T>, Error> {
270        let lock = self.db.read();
271        let db = lock
272            .as_ref()
273            .ok_or_else(|| Error::NotFoundErr("chain db is None".to_string()))?;
274        let txn = lmdb::ReadTransaction::new(self.env.clone())?;
275        let access = txn.access();
276        let d = match deser_mode {
277            Some(d) => d,
278            _ => DeserializationMode::default(),
279        };
280        self.get_with(key, &access, &db, |_, mut data| {
281            ser::deserialize(&mut data, self.protocol_version(), d).map_err(From::from)
282        })
283    }
284
285    /// Whether the provided key exists
286    pub fn exists(&self, key: &[u8]) -> Result<bool, Error> {
287        let lock = self.db.read();
288        let db = lock
289            .as_ref()
290            .ok_or_else(|| Error::NotFoundErr("chain db is None".to_string()))?;
291        let txn = lmdb::ReadTransaction::new(self.env.clone())?;
292        let access = txn.access();
293
294        let res: Option<&lmdb::Ignore> = access.get(db, key).to_opt()?;
295        Ok(res.is_some())
296    }
297
298    /// Produces an iterator from the provided key prefix.
299    pub fn iter<F, T>(&self, prefix: &[u8], deserialize: F) -> Result<PrefixIterator<F, T>, Error>
300    where
301        F: Fn(&[u8], &[u8]) -> Result<T, Error>,
302    {
303        let lock = self.db.read();
304        let db = lock
305            .as_ref()
306            .ok_or_else(|| Error::NotFoundErr("chain db is None".to_string()))?;
307        let tx = Arc::new(lmdb::ReadTransaction::new(self.env.clone())?);
308        let cursor = Arc::new(tx.cursor(db.clone())?);
309        Ok(PrefixIterator::new(tx, cursor, prefix, deserialize))
310    }
311
312    /// Builds a new batch to be used with this store.
313    pub fn batch(&self) -> Result<Batch<'_>, Error> {
314        // check if the db needs resizing before returning the batch
315        if self.needs_resize()? {
316            self.do_resize()?;
317        }
318        let tx = lmdb::WriteTransaction::new(self.env.clone())?;
319        Ok(Batch { store: self, tx })
320    }
321}
322
323/// Batch to write multiple Writeables to db in an atomic manner.
324pub struct Batch<'a> {
325    store: &'a Store,
326    tx: lmdb::WriteTransaction<'a>,
327}
328
329impl<'a> Batch<'a> {
330    /// Writes a single key/value pair to the db
331    pub fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> {
332        let lock = self.store.db.read();
333        let db = lock
334            .as_ref()
335            .ok_or_else(|| Error::NotFoundErr("chain db is None".to_string()))?;
336        self.tx
337            .access()
338            .put(db, key, value, lmdb::put::Flags::empty())?;
339        Ok(())
340    }
341
342    /// Writes a single key and its `Writeable` value to the db.
343    /// Encapsulates serialization using the (default) version configured on the store instance.
344    pub fn put_ser<W: ser::Writeable>(&self, key: &[u8], value: &W) -> Result<(), Error> {
345        self.put_ser_with_version(key, value, self.store.protocol_version())
346    }
347
348    /// Protocol version used by this batch.
349    pub fn protocol_version(&self) -> ProtocolVersion {
350        self.store.protocol_version()
351    }
352
353    /// Writes a single key and its `Writeable` value to the db.
354    /// Encapsulates serialization using the specified protocol version.
355    pub fn put_ser_with_version<W: ser::Writeable>(
356        &self,
357        key: &[u8],
358        value: &W,
359        version: ProtocolVersion,
360    ) -> Result<(), Error> {
361        let ser_value = ser::ser_vec(value, version);
362        match ser_value {
363            Ok(data) => self.put(key, &data),
364            Err(err) => Err(err.into()),
365        }
366    }
367
368    /// Low-level access for retrieving data by key.
369    /// Takes a function for flexible deserialization.
370    pub fn get_with<F, T>(&self, key: &[u8], deserialize: F) -> Result<Option<T>, Error>
371    where
372        F: Fn(&[u8], &[u8]) -> Result<T, Error>,
373    {
374        let access = self.tx.access();
375        let lock = self.store.db.read();
376        let db = lock
377            .as_ref()
378            .ok_or_else(|| Error::NotFoundErr("chain db is None".to_string()))?;
379
380        self.store.get_with(key, &access, &db, deserialize)
381    }
382
383    /// Whether the provided key exists.
384    /// This is in the context of the current write transaction.
385    pub fn exists(&self, key: &[u8]) -> Result<bool, Error> {
386        let access = self.tx.access();
387        let lock = self.store.db.read();
388        let db = lock
389            .as_ref()
390            .ok_or_else(|| Error::NotFoundErr("chain db is None".to_string()))?;
391        let res: Option<&lmdb::Ignore> = access.get(db, key).to_opt()?;
392        Ok(res.is_some())
393    }
394
395    /// Produces an iterator from the provided key prefix.
396    pub fn iter<F, T>(&self, prefix: &[u8], deserialize: F) -> Result<PrefixIterator<F, T>, Error>
397    where
398        F: Fn(&[u8], &[u8]) -> Result<T, Error>,
399    {
400        self.store.iter(prefix, deserialize)
401    }
402
403    /// Gets a `Readable` value from the db by provided key and provided deserialization strategy.
404    pub fn get_ser<T: ser::Readable>(
405        &self,
406        key: &[u8],
407        deser_mode: Option<DeserializationMode>,
408    ) -> Result<Option<T>, Error> {
409        let d = match deser_mode {
410            Some(d) => d,
411            _ => DeserializationMode::default(),
412        };
413        self.get_with(key, |_, mut data| {
414            match ser::deserialize(&mut data, self.protocol_version(), d) {
415                Ok(res) => Ok(res),
416                Err(e) => Err(From::from(e)),
417            }
418        })
419    }
420
421    /// Deletes a key/value pair from the db
422    pub fn delete(&self, key: &[u8]) -> Result<(), Error> {
423        let lock = self.store.db.read();
424        let db = lock
425            .as_ref()
426            .ok_or_else(|| Error::NotFoundErr("chain db is None".to_string()))?;
427        self.tx.access().del_key(db, key)?;
428        Ok(())
429    }
430
431    /// Writes the batch to db
432    pub fn commit(self) -> Result<(), Error> {
433        self.tx.commit()?;
434        Ok(())
435    }
436
437    /// Creates a child of this batch. It will be merged with its parent on
438    /// commit, abandoned otherwise.
439    pub fn child(&mut self) -> Result<Batch<'_>, Error> {
440        Ok(Batch {
441            store: self.store,
442            tx: self.tx.child_tx()?,
443        })
444    }
445}
446
447/// An iterator based on key prefix.
448/// Caller is responsible for deserialization of the data.
449pub struct PrefixIterator<F, T>
450where
451    F: Fn(&[u8], &[u8]) -> Result<T, Error>,
452{
453    tx: Arc<lmdb::ReadTransaction<'static>>,
454    cursor: Arc<lmdb::Cursor<'static, 'static>>,
455    seek: bool,
456    prefix: Vec<u8>,
457    deserialize: F,
458}
459
460impl<F, T> Iterator for PrefixIterator<F, T>
461where
462    F: Fn(&[u8], &[u8]) -> Result<T, Error>,
463{
464    type Item = T;
465
466    fn next(&mut self) -> Option<Self::Item> {
467        let access = self.tx.access();
468        let cursor = Arc::get_mut(&mut self.cursor).expect("failed to get cursor");
469        let kv: Result<(&[u8], &[u8]), _> = if self.seek {
470            cursor.next(&access)
471        } else {
472            self.seek = true;
473            cursor.seek_range_k(&access, &self.prefix[..])
474        };
475        kv.ok()
476            .filter(|(k, _)| k.starts_with(self.prefix.as_slice()))
477            .map(|(k, v)| match (self.deserialize)(k, v) {
478                Ok(v) => Some(v),
479                Err(_) => None,
480            })
481            .flatten()
482    }
483}
484
485impl<F, T> PrefixIterator<F, T>
486where
487    F: Fn(&[u8], &[u8]) -> Result<T, Error>,
488{
489    /// Initialize a new prefix iterator.
490    pub fn new(
491        tx: Arc<lmdb::ReadTransaction<'static>>,
492        cursor: Arc<lmdb::Cursor<'static, 'static>>,
493        prefix: &[u8],
494        deserialize: F,
495    ) -> PrefixIterator<F, T> {
496        PrefixIterator {
497            tx,
498            cursor,
499            seek: false,
500            prefix: prefix.to_vec(),
501            deserialize,
502        }
503    }
504}