use std::{
collections::HashMap,
env::current_dir,
fs::create_dir_all,
ops::Deref,
path::{Path, PathBuf},
sync::{Arc, RwLock},
};
use dirs::home_dir;
use dunce::canonicalize;
use lmdb::{
self, open as OpenFlag, open::Flags as OpenFlags, Cursor, CursorIter, Database,
DatabaseOptions, EnvBuilder, Environment, MaybeOwned, ReadTransaction,
};
use ron::de::from_str as from_db_name;
use serde::{Deserialize, Serialize};
use supercow::{ext::ConstDeref, NonSyncSupercow, Supercow};
use super::{
Collection, CollectionDef, Enumerable, IndexDef, Pool, Result, ResultWrap, Serial,
SerialGenerator,
};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) enum DatabaseDef {
#[serde(rename = "c")]
Collection(CollectionDef),
#[serde(rename = "i")]
Index(IndexDef),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Stats {
pub page_size: u32,
pub btree_depth: u32,
pub branch_pages: usize,
pub leaf_pages: usize,
pub overflow_pages: usize,
pub data_entries: usize,
}
impl From<lmdb::Stat> for Stats {
fn from(
lmdb::Stat {
psize,
depth,
branch_pages,
leaf_pages,
overflow_pages,
entries,
}: lmdb::Stat,
) -> Self {
Self {
page_size: psize,
btree_depth: depth,
branch_pages,
leaf_pages,
overflow_pages,
data_entries: entries,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Info {
pub map_size: usize,
pub last_page: usize,
pub last_transaction: usize,
pub max_readers: u32,
pub num_readers: u32,
}
impl From<lmdb::EnvInfo> for Info {
fn from(
lmdb::EnvInfo {
mapsize,
last_pgno,
last_txnid,
maxreaders,
numreaders,
..
}: lmdb::EnvInfo,
) -> Self {
Self {
map_size: mapsize,
last_page: last_pgno,
last_transaction: last_txnid,
max_readers: maxreaders,
num_readers: numreaders,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct Options {
#[serde(default)]
map_size: Option<usize>,
#[serde(default)]
max_readers: Option<u32>,
#[serde(default)]
max_dbs: Option<u32>,
#[serde(default)]
map_async: Option<bool>,
#[serde(default)]
no_lock: Option<bool>,
#[serde(default)]
no_mem_init: Option<bool>,
#[serde(default)]
no_meta_sync: Option<bool>,
#[serde(default)]
no_read_ahead: Option<bool>,
#[serde(default)]
no_sub_dir: Option<bool>,
#[serde(default)]
no_sync: Option<bool>,
#[serde(default)]
no_tls: Option<bool>,
#[serde(default)]
read_only: Option<bool>,
#[serde(default)]
write_map: Option<bool>,
}
impl Options {
fn env_builder(&self) -> Result<EnvBuilder> {
let mut bld = EnvBuilder::new()?;
bld.set_mapsize(self.map_size.unwrap_or(16 << 20))
.wrap_err()?;
bld.set_maxreaders(self.max_readers.unwrap_or(126))
.wrap_err()?;
bld.set_maxdbs(self.max_dbs.unwrap_or(128)).wrap_err()?;
Ok(bld)
}
fn open_flags(&self) -> OpenFlags {
self.fill_flags(None)
}
fn config_env(&self, env: &Environment) -> Result<()> {
if let Some(val) = self.map_size {
unsafe {
env.set_mapsize(val).wrap_err()?;
}
}
unsafe {
env.set_flags(self.fill_flags(Some(true)), true)
.wrap_err()?;
env.set_flags(self.fill_flags(Some(false)), false)
.wrap_err()?;
}
Ok(())
}
fn fill_flags(&self, onoff: Option<bool>) -> OpenFlags {
let mut flags = OpenFlags::empty();
if let Some(flag) = self.map_async {
if onoff.map(|onoff| onoff == flag).unwrap_or(true) {
flags.set(OpenFlag::MAPASYNC, flag);
}
}
if let Some(flag) = self.no_lock {
if onoff.map(|onoff| onoff == flag).unwrap_or(true) {
flags.set(OpenFlag::NOLOCK, flag);
}
}
if let Some(flag) = self.no_mem_init {
if onoff.map(|onoff| onoff == flag).unwrap_or(true) {
flags.set(OpenFlag::NOMEMINIT, flag);
}
}
if let Some(flag) = self.no_meta_sync {
if onoff.map(|onoff| onoff == flag).unwrap_or(true) {
flags.set(OpenFlag::NOMETASYNC, flag);
}
}
if let Some(flag) = self.no_read_ahead {
if onoff.map(|onoff| onoff == flag).unwrap_or(true) {
flags.set(OpenFlag::NORDAHEAD, flag);
}
}
if let Some(flag) = self.no_sub_dir {
if onoff.map(|onoff| onoff == flag).unwrap_or(true) {
flags.set(OpenFlag::NOSUBDIR, flag);
}
}
if let Some(flag) = self.no_sync {
if onoff.map(|onoff| onoff == flag).unwrap_or(true) {
flags.set(OpenFlag::NOSYNC, flag);
}
}
if let Some(flag) = self.no_tls {
if onoff.map(|onoff| onoff == flag).unwrap_or(true) {
flags.set(OpenFlag::NOTLS, flag);
}
}
if let Some(flag) = self.read_only {
if onoff.map(|onoff| onoff == flag).unwrap_or(true) {
flags.set(OpenFlag::RDONLY, flag);
}
}
if let Some(flag) = self.write_map {
if onoff.map(|onoff| onoff == flag).unwrap_or(true) {
flags.set(OpenFlag::WRITEMAP, flag);
}
}
flags
}
}
pub(crate) struct StorageData {
path: PathBuf,
env: Environment,
gen: SerialGenerator,
collections: RwLock<Vec<Collection>>,
}
#[derive(Clone)]
pub struct Storage(Arc<StorageData>);
impl Storage {
pub fn new<P: AsRef<Path>>(path: P, opts: Options) -> Result<Self> {
let path = realpath(path.as_ref())?;
if let Some(storage) = Pool::get(&path)? {
opts.config_env(&storage.env)?;
Ok(Storage(storage))
} else {
Self::open(path, opts)
}
}
fn open(path: PathBuf, opts: Options) -> Result<Self> {
let env = open_env(&path, opts)?;
let gen = SerialGenerator::new();
let collections = RwLock::new(Vec::new());
let storage = Storage(Arc::new(StorageData {
path: path.clone(),
env,
gen,
collections,
}));
storage.load_collections()?;
Pool::put(path, &storage.0)?;
Ok(storage)
}
fn load_collections(&self) -> Result<()> {
let env = &self.0.env;
let db = Database::open(env, None, &DatabaseOptions::defaults()).wrap_err()?;
let (last_serial, db_def) = load_databases(&env, &db)?;
self.0.gen.set(last_serial);
let mut collections = self.0.collections.write().wrap_err()?;
*collections = db_def
.into_iter()
.map(|(def, index_defs)| Collection::new(self.clone(), def, index_defs))
.collect::<Result<Vec<_>>>()?;
Ok(())
}
pub(crate) fn enumerate<E: Enumerable>(&self, data: E) -> E {
self.0.gen.enumerate(data)
}
pub fn has_collection<N: AsRef<str>>(&self, name: N) -> Result<bool> {
let name = name.as_ref();
let collections = self.0.collections.read().wrap_err()?;
Ok(collections
.iter()
.any(|collection| collection.name() == name))
}
pub fn collection<N: AsRef<str>>(&self, name: N) -> Result<Collection> {
let name = name.as_ref();
{
let collections = self.0.collections.read().wrap_err()?;
if let Some(collection) = collections
.iter()
.find(|collection| collection.name() == name)
{
return Ok(collection.clone());
}
}
let collection = Collection::new(
self.clone(),
self.enumerate(CollectionDef::new(name)),
Vec::new(),
)?;
let mut collections = self.0.collections.write().wrap_err()?;
collections.push(collection.clone());
Ok(collection)
}
pub fn drop_collection<N: AsRef<str>>(&self, name: N) -> Result<bool> {
let name = name.as_ref();
let found_pos = {
let collections = self.0.collections.read().wrap_err()?;
collections
.iter()
.position(|collection| collection.name() == name)
};
Ok(if let Some(pos) = found_pos {
let mut collections = self.0.collections.write().wrap_err()?;
let collection = collections.remove(pos);
collection.to_delete()?;
true
} else {
false
})
}
pub fn get_collections(&self) -> Result<Vec<String>> {
let collections = self.0.collections.read().wrap_err()?;
Ok(collections
.iter()
.map(|collection| collection.name().into())
.collect())
}
pub fn get_stats(&self) -> Result<Stats> {
self.0.env.stat().map(Stats::from).wrap_err()
}
pub fn get_info(&self) -> Result<Info> {
self.0.env.info().map(Info::from).wrap_err()
}
pub fn openned() -> Result<Vec<PathBuf>> {
Pool::lst()
}
}
impl Drop for Storage {
fn drop(&mut self) {
if let Err(e) = Pool::del(&self.0.path) {
eprintln!("Error when dropping storage: {}", e);
}
}
}
impl Deref for Storage {
type Target = Environment;
#[inline]
fn deref(&self) -> &Self::Target {
&self.0.env
}
}
unsafe impl ConstDeref for Storage {
type Target = Environment;
#[inline]
fn const_deref(&self) -> &Self::Target {
&self.0.env
}
}
impl<'env> Into<Supercow<'env, Environment>> for Storage {
fn into(self) -> Supercow<'env, Environment> {
Supercow::shared(self)
}
}
impl<'env> Into<NonSyncSupercow<'env, Environment>> for Storage {
fn into(self) -> NonSyncSupercow<'env, Environment> {
Supercow::shared(self)
}
}
type Definitions = Vec<(CollectionDef, Vec<IndexDef>)>;
fn load_databases(env: &Environment, db: &Database) -> Result<(Serial, Definitions)> {
let txn = ReadTransaction::new(env).wrap_err()?;
let cursor = txn.cursor(db).wrap_err()?;
let access = txn.access();
let mut defs: HashMap<String, (CollectionDef, Vec<IndexDef>)> = HashMap::new();
let mut last_serial: Serial = 0;
for res in CursorIter::new(
MaybeOwned::Owned(cursor),
&access,
|c, a| c.first(a),
Cursor::next::<str, [u8]>,
)
.wrap_err()?
.map(|res| {
res.wrap_err()
.and_then(|(key, _val)| from_db_name(key).wrap_err())
}) {
match res {
Ok(DatabaseDef::Collection(def)) => {
last_serial = usize::max(last_serial, def.0);
let entry = defs
.entry(def.1.clone())
.or_insert_with(|| (def.clone(), Vec::new()));
entry.0 = def;
}
Ok(DatabaseDef::Index(def)) => {
last_serial = usize::max(last_serial, def.0);
defs.entry(def.1.clone())
.or_insert_with(|| (CollectionDef::new(&def.1), Vec::new()))
.1
.push(def);
}
Err(e) => return Err(e),
}
}
Ok((
last_serial,
defs.into_iter().map(|(_key, val)| val).collect(),
))
}
fn open_env(path: &Path, opts: Options) -> Result<Environment> {
let path = path.to_str().ok_or("Invalid db path").wrap_err()?;
let bld = opts.env_builder()?;
let flags = opts.open_flags();
create_dir_all(&path).wrap_err()?;
unsafe { bld.open(path, flags, 0o600) }.wrap_err()
}
fn realpath(path: &Path) -> Result<PathBuf> {
let path = if path.has_root() {
path.to_path_buf()
} else if let Ok(path) = path.strip_prefix("~") {
home_dir()
.ok_or_else(|| "Unable to determine home directory")
.wrap_err()?
.as_path()
.join(path)
} else {
current_dir().wrap_err()?.as_path().join(path)
};
safe_canonicalize(path.as_path())
}
fn safe_canonicalize(path: &Path) -> Result<PathBuf> {
match canonicalize(path) {
Ok(canonical) => Ok(canonical),
Err(error) => {
if let Some(parent) = path.parent() {
let child = path.strip_prefix(parent).unwrap();
safe_canonicalize(parent).map(|canonical_parent| canonical_parent.join(child))
} else {
Err(error).wrap_err()
}
}
}
}