use holochain_logging::prelude::*;
use lmdb::Error as LmdbError;
use rkv::{
DatabaseFlags, EnvironmentFlags, Manager, Rkv, SingleStore, StoreError, StoreOptions, Value,
};
use std::{
path::Path,
sync::{Arc, RwLock},
};
const DEFAULT_INITIAL_MAP_BYTES: usize = 100 * 1024 * 1024;
#[derive(Clone)]
pub(crate) struct LmdbInstance {
pub store: SingleStore,
pub manager: Arc<RwLock<Rkv>>,
}
impl LmdbInstance {
pub fn new<P: AsRef<Path> + Clone>(
db_name: &str,
path: P,
initial_map_bytes: Option<usize>,
) -> LmdbInstance {
let db_path = path.as_ref().join(db_name).with_extension("db");
std::fs::create_dir_all(db_path.clone()).expect("Could not create file path for store");
let manager = Manager::singleton()
.write()
.unwrap()
.get_or_create(db_path.as_path(), |path: &Path| {
let mut env_builder = Rkv::environment_builder();
env_builder
.set_map_size(initial_map_bytes.unwrap_or(DEFAULT_INITIAL_MAP_BYTES))
.set_max_dbs(1)
.set_flags(EnvironmentFlags::WRITE_MAP | EnvironmentFlags::MAP_ASYNC);
Rkv::from_env(path, env_builder)
})
.expect("Could not create the environment");
let env = manager
.read()
.expect("Could not get a read lock on the manager");
let options = StoreOptions {
create: true,
flags: DatabaseFlags::empty(),
};
let store: SingleStore = env
.open_single(db_name, options)
.expect("Could not create store");
LmdbInstance {
store: store,
manager: manager.clone(),
}
}
pub fn add<K: AsRef<[u8]> + Clone>(&self, key: K, value: &Value) -> Result<(), StoreError> {
let env = self.manager.read().unwrap();
let mut writer = env.write()?;
match self
.store
.put(&mut writer, key.clone(), value)
.and_then(|_| writer.commit())
{
Err(StoreError::LmdbError(LmdbError::MapFull)) => {
trace!("Insufficient space in MMAP, doubling and trying again");
let map_size = env.info()?.map_size();
env.set_map_size(map_size * 2)?;
self.add(key, value)
}
r => r, }?;
Ok(())
}
#[allow(dead_code)]
pub fn info(&self) -> Result<rkv::Info, StoreError> {
self.manager.read().unwrap().info()
}
}
#[cfg(test)]
pub mod tests {
use super::*;
use holochain_persistence_api::cas::{content::AddressableContent, storage::CasBencher};
use tempfile::tempdir;
#[test]
fn can_grow_map_on_write() {
let inititial_mmap_size = 1024 * 1024;
let dir = tempdir().expect("Could not create a tempdir for CAS testing");
let lmdb = LmdbInstance::new(
"can_grow_map_on_write",
dir.path(),
Some(inititial_mmap_size),
);
while lmdb.info().unwrap().map_size() == inititial_mmap_size {
let content = CasBencher::random_addressable_content();
lmdb.add(
content.address(),
&Value::Json(&content.content().to_string()),
)
.unwrap();
}
assert_eq!(lmdb.info().unwrap().map_size(), inititial_mmap_size * 2,);
while lmdb.info().unwrap().map_size() == 2 * inititial_mmap_size {
let content = CasBencher::random_addressable_content();
lmdb.add(
content.address(),
&Value::Json(&content.content().to_string()),
)
.unwrap();
}
assert_eq!(lmdb.info().unwrap().map_size(), inititial_mmap_size * 4,);
}
#[test]
fn can_write_entry_larger_than_map() {
let inititial_mmap_size = 1024 * 1024;
let dir = tempdir().expect("Could not create a tempdir for CAS testing");
let lmdb = LmdbInstance::new(
"can_grow_map_on_write",
dir.path(),
Some(inititial_mmap_size),
);
let data: Vec<u8> = std::iter::repeat(0)
.take(10 * inititial_mmap_size)
.collect();
lmdb.add("a", &Value::Json(&String::from_utf8(data).unwrap()))
.expect("could not write to lmdb");
}
}