#![ doc = include_str!( concat!( env!( "CARGO_MANIFEST_DIR" ), "/", "README.md" ) ) ]
use fs2::FileExt;
use glob::glob;
use jsonschema::{Draft, JSONSchema};
use lru::LruCache;
use openssl::sha::Sha256;
use regex::Regex;
use serde::{de::Error as deError, Deserialize, Deserializer, Serialize, Serializer};
use serde_json::Value;
use std::fmt;
use std::fs;
use std::io;
use std::io::Write;
use std::path::Path;
use std::process;
use std::time::{Duration, Instant, SystemTime, SystemTimeError};
use log::{error, trace, warn};
#[macro_use]
extern crate lazy_static;
pub const SERVER_ID: &str = "yedb-altt-rs";
pub const VERSION: &str = env!("CARGO_PKG_VERSION");
pub const ENGINE_VERSION: u8 = 1;
pub const DEFAULT_CACHE_SIZE: usize = 1000;
const SLEEP_STEP: Duration = Duration::from_millis(50);
lazy_static! {
static ref RE_BAK: Regex = Regex::new(r"\.bak(\d)*$").unwrap();
}
trait ExplainValue {
fn get_len(&self) -> Option<u64>;
fn get_type(&self) -> String;
}
impl ExplainValue for Value {
fn get_len(&self) -> Option<u64> {
match self {
Value::Null | Value::Bool(_) | Value::Number(_) => None,
Value::String(v) => Some(v.len() as u64),
Value::Array(v) => Some(v.len() as u64),
Value::Object(v) => Some(v.len() as u64),
}
}
fn get_type(&self) -> String {
match self {
Value::Null => "null".to_owned(),
Value::Bool(_) => "boolean".to_owned(),
Value::Number(_) => "number".to_owned(),
Value::String(_) => "string".to_owned(),
Value::Array(_) => "array".to_owned(),
Value::Object(_) => "object".to_owned(),
}
}
}
#[path = "common.rs"]
pub mod common;
pub use common::{DBInfo, Error, ErrorKind, KeyExplained};
#[cfg(feature = "client-sync")]
#[path = "client.rs"]
pub mod client;
#[cfg(feature = "client-sync")]
pub use client::YedbClient;
#[cfg(feature = "client-async")]
#[path = "client_async.rs"]
pub mod client_async;
#[cfg(feature = "client-async")]
pub use client_async::{YedbClientAsync, YedbClientAsyncExt, YedbClientPoolAsync};
#[cfg(feature = "client-busrt-async")]
#[path = "client_busrt_async.rs"]
pub mod client_busrt_async;
#[cfg(feature = "client-busrt-async")]
pub use client_busrt_async::YedbClientBusRtAsync;
#[cfg(feature = "client-local-async")]
#[path = "client_local_async.rs"]
pub mod client_local_async;
#[cfg(feature = "client-local-async")]
pub use client_local_async::YedbClientLocalAsync;
#[cfg(any(feature = "server", feature = "server-embed"))]
#[path = "server.rs"]
pub mod server;
#[derive(Debug)]
enum DataKey<'a> {
Name(&'a str),
File(&'a str),
}
impl<'a> DataKey<'a> {
#[allow(dead_code)]
fn is_file(&self) -> bool {
match self {
DataKey::Name(_) => false,
DataKey::File(_) => true,
}
}
fn is_name(&self) -> bool {
match self {
DataKey::Name(_) => true,
DataKey::File(_) => false,
}
}
fn get(&self) -> &str {
match self {
DataKey::Name(v) | DataKey::File(v) => v,
}
}
}
macro_rules! get_engine {
($e:expr) => {
if let Some(engine) = $e.engine {
engine
} else {
return Err(Error::new(
ErrorKind::NotOpened,
"The database is not opened",
));
}
};
}
macro_rules! impl_err_io {
($e: path) => {
impl From<$e> for Error {
fn from(e: $e) -> Self {
Error::new(ErrorKind::IOError, e)
}
}
};
}
macro_rules! impl_err_data {
($e: path) => {
impl From<$e> for Error {
fn from(e: $e) -> Self {
Error::new(ErrorKind::DataError, e)
}
}
};
}
impl_err_io!(glob::PatternError);
impl_err_io!(fs_extra::error::Error);
impl_err_io!(io::Error);
impl_err_data!(rmp_serde::decode::Error);
impl_err_data!(rmp_serde::encode::Error);
impl_err_data!(serde_cbor::Error);
impl_err_data!(serde_json::Error);
impl_err_data!(serde_yaml::Error);
impl_err_data!(hex::FromHexError);
impl<'a> From<jsonschema::ValidationError<'a>> for Error {
fn from(e: jsonschema::ValidationError<'a>) -> Self {
Error::new(ErrorKind::SchemaValidationError, e)
}
}
impl From<SystemTimeError> for Error {
fn from(e: SystemTimeError) -> Self {
Error::new(ErrorKind::Other, e)
}
}
macro_rules! timestamp_ns {
() => {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)?
.as_nanos() as u64
};
}
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum SerializationEngine {
Json,
Msgpack,
Cbor,
Yaml,
}
impl Serialize for SerializationEngine {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(self.to_string().as_str())
}
}
impl fmt::Display for SerializationEngine {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}",
match self {
SerializationEngine::Json => "json",
SerializationEngine::Msgpack => "msgpack",
SerializationEngine::Cbor => "cbor",
SerializationEngine::Yaml => "yaml",
}
)
}
}
impl SerializationEngine {
pub fn as_u8(&self) -> u8 {
match self {
SerializationEngine::Json => 1,
SerializationEngine::Msgpack => 2,
SerializationEngine::Cbor => 3,
SerializationEngine::Yaml => 4,
}
}
pub fn from_u8(fmt: u8) -> Self {
match fmt {
1 => SerializationEngine::Json,
2 => SerializationEngine::Msgpack,
3 => SerializationEngine::Cbor,
4 => SerializationEngine::Yaml,
_ => unimplemented!(),
}
}
pub fn from_string(fmt: &str) -> Result<Self, Error> {
match fmt {
"json" => Ok(SerializationEngine::Json),
"msgpack" => Ok(SerializationEngine::Msgpack),
"cbor" => Ok(SerializationEngine::Cbor),
"yaml" => Ok(SerializationEngine::Yaml),
_ => Err(Error::new(ErrorKind::UnsupportedFormat, fmt)),
}
}
pub fn suffix(&self, checksums: bool) -> String {
let mut sfx = match self {
SerializationEngine::Json => ".json".to_owned(),
SerializationEngine::Msgpack => ".mp".to_owned(),
SerializationEngine::Cbor => ".cb".to_owned(),
SerializationEngine::Yaml => ".yml".to_owned(),
};
if checksums {
sfx += "c";
}
sfx
}
pub fn is_binary(&self) -> bool {
match self {
SerializationEngine::Json | SerializationEngine::Yaml => false,
SerializationEngine::Cbor | SerializationEngine::Msgpack => true,
}
}
pub fn deserialize(&self, buf: &[u8]) -> Result<Value, Error> {
Ok(match self {
SerializationEngine::Msgpack => rmp_serde::from_slice(buf)?,
SerializationEngine::Cbor => serde_cbor::from_slice(buf)?,
SerializationEngine::Json => serde_json::from_slice(buf)?,
SerializationEngine::Yaml => serde_yaml::from_slice(buf)?,
})
}
pub fn serialize(&self, value: &Value) -> Result<Vec<u8>, Error> {
Ok(match self {
SerializationEngine::Msgpack => rmp_serde::to_vec_named(value)?,
SerializationEngine::Cbor => serde_cbor::to_vec(value)?,
SerializationEngine::Json => {
let mut v = serde_json::to_vec(value)?;
if v.is_empty() || v[v.len() - 1] != 0x0A_u8 {
v.push(0x0A_u8);
}
v
}
SerializationEngine::Yaml => {
let mut v = serde_yaml::to_vec(value)?;
if v.is_empty() || v[v.len() - 1] != 0x0A_u8 {
v.push(0x0A_u8);
}
v
}
})
}
}
fn de_fmt<'de, D>(deserializer: D) -> Result<Option<SerializationEngine>, D::Error>
where
D: Deserializer<'de>,
{
match &String::deserialize(deserializer) {
Ok(v) => match SerializationEngine::from_string(v) {
Ok(v) => Ok(Some(v)),
Err(_) => Ok(None),
},
Err(e) => Err(D::Error::custom(e)),
}
}
#[derive(Deserialize, Serialize, Debug, Copy, Clone, PartialEq)]
struct Engine {
#[serde(
deserialize_with = "de_fmt",
rename(serialize = "fmt", deserialize = "fmt")
)]
se: Option<SerializationEngine>,
created: u64,
version: u8,
checksums: bool,
}
impl Engine {
fn get_suffix(&self) -> String {
self.se.unwrap().suffix(self.checksums)
}
fn is_serialization_binary(&self) -> bool {
self.se.unwrap().is_binary()
}
}
#[derive(Serialize, Debug)]
pub struct ServerInfo {
pub name: String,
pub version: u8,
}
impl Default for ServerInfo {
fn default() -> Self {
Self::new()
}
}
impl ServerInfo {
pub fn new() -> Self {
Self {
name: "yedb".to_owned(),
version: ENGINE_VERSION,
}
}
}
fn sync_dir(dir: &str) -> Result<(), Error> {
match fs::File::open(dir) {
Ok(dh) => {
trace!("syncing dir {}", dir);
dh.sync_all()?;
}
Err(ref e) if e.kind() == io::ErrorKind::NotFound => {
trace!("dir {} not found, skipping sync", dir);
}
Err(e) => return Err(Error::new(ErrorKind::IOError, e)),
};
Ok(())
}
fn lock_ex(fh: &fs::File, timeout: Duration) -> Result<bool, Error> {
let start = Instant::now();
let mut locked_instantly = true;
trace!("locking the database");
loop {
match fh.try_lock_exclusive() {
Ok(()) => return Ok(locked_instantly),
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
locked_instantly = false;
std::thread::sleep(SLEEP_STEP);
if Instant::now() - start > timeout {
return Err(Error::new(ErrorKind::TimeoutError, "lock timeout"));
}
continue;
}
Err(e) => return Err(Error::new(ErrorKind::IOError, e)),
}
}
}
fn fmt_key(key: &str) -> String {
let mut x = 0;
for c in key.chars() {
if c == '/' {
x += 1;
} else {
break;
}
}
let name = match x {
0 => key.to_owned(),
_ => key[x..key.len()].to_owned(),
};
name.replace("../", "")
}
fn create_dirs(basepath: &str, dirname: &str) -> Result<Vec<String>, Error> {
let parts = dirname.split('/');
let mut created: Vec<String> = Vec::new();
let mut cdir = basepath.to_string();
for p in parts {
cdir += "/";
cdir += p;
match fs::create_dir(&cdir) {
Ok(_) => created.push(cdir.clone()),
Err(ref e) if e.kind() == io::ErrorKind::AlreadyExists => {}
Err(e) => {
return Err(Error::new(
ErrorKind::IOError,
format!("Unable to create directory {}: {}", cdir, e),
))
}
};
}
Ok(created)
}
struct KeyInfo {
key_file: String,
checksum: Option<[u8; 32]>,
metadata: fs::Metadata,
stime: Option<u64>,
}
#[allow(clippy::struct_excessive_bools)]
pub struct Database {
path: String,
key_path: String,
lock_path: String,
pub auto_repair: bool,
pub auto_flush: bool,
pub write_modified_only: bool,
pub timeout: Duration,
pub lock_ex: bool,
pub auto_bak: u64,
pub skip_bak: Vec<String>,
pub strict_schema: bool,
default_fmt: SerializationEngine,
default_checksums: bool,
meta_path: String,
trash_path: String,
engine: Option<Engine>,
cache: Box<LruCache<String, Value>>,
repair_recommended: bool,
lock_fh: Option<fs::File>,
}
impl Drop for Database {
fn drop(&mut self) {
if self.engine.is_some() {
let _r = self.close();
}
}
}
impl Default for Database {
fn default() -> Self {
Self::new()
}
}
impl Database {
pub fn new() -> Self {
Database {
path: String::new(),
key_path: String::new(),
default_fmt: SerializationEngine::Json,
default_checksums: true,
auto_repair: true,
auto_flush: true,
auto_bak: 0,
skip_bak: Vec::new(),
strict_schema: false,
lock_ex: true,
write_modified_only: true,
timeout: Duration::from_secs(5),
lock_path: String::new(),
meta_path: String::new(),
trash_path: String::new(),
cache: Box::new(LruCache::new(DEFAULT_CACHE_SIZE)),
engine: None,
repair_recommended: false,
lock_fh: None,
}
}
pub fn is_open(&self) -> bool {
self.engine.is_some()
}
fn need_skip_bak(&self, key: &str) -> bool {
for k in &self.skip_bak {
let l = k.len();
if k == key || (key.starts_with(k) && key.get(l..=l).map_or(false, |s| s == "/")) {
return true;
}
}
false
}
fn need_backup(&self, key: &str) -> bool {
self.auto_bak > 0
&& !key.starts_with(".schema/")
&& key != ".schema"
&& !self.need_skip_bak(key)
}
pub fn set_default_fmt(&mut self, fmt: &str, checksums: bool) -> Result<(), Error> {
trace!(
"setting the default format to {} with checksums={}",
fmt,
checksums
);
if self.engine.is_some() {
Err(Error::new(
ErrorKind::Busy,
"the database is already opened",
))
} else {
self.default_fmt = match SerializationEngine::from_string(fmt) {
Ok(v) => v,
Err(e) => return Err(e),
};
self.default_checksums = checksums;
Ok(())
}
}
pub fn set_db_path(&mut self, path: &str) -> Result<(), Error> {
trace!("setting the DB path to {}", path);
if self.engine.is_some() {
Err(Error::new(ErrorKind::Busy, "the database is opened"))
} else {
self.path = path.to_owned();
self.key_path = self.path.clone() + "/keys";
trace!("key path set to {}", self.key_path);
self.trash_path = self.key_path.clone() + "/.trash";
self.lock_path = path.to_string() + "/db.lock";
trace!("lock file set to {}", self.lock_path);
self.meta_path = path.to_string() + "/.yedb";
trace!("lock meta file set to {}", self.meta_path);
Ok(())
}
}
pub fn set_cache_size(&mut self, size: usize) {
trace!("setting the cache size to {} keys", size);
self.cache.resize(size);
}
pub fn set_lock_path(&mut self, path: &str) -> Result<(), Error> {
trace!("setting lock path to {}", path);
if self.engine.is_some() {
Err(Error::new(ErrorKind::Busy, "the database is opened"))
} else {
self.lock_path = path.to_owned();
Ok(())
}
}
pub fn open(&mut self) -> Result<ServerInfo, Error> {
trace!("opening the database, path: {}", self.path);
if self.path.is_empty() {
return Err(Error::new(ErrorKind::NotInitialized, "db path not set"));
}
if self.engine.is_some() {
return Err(Error::new(
ErrorKind::Busy,
"the database is already opened",
));
}
self.repair_recommended = false;
self.cache.clear();
let db_engine: Option<Engine>;
match fs::read_to_string(self.meta_path.clone()) {
Ok(buf) => {
let engine: Engine = serde_json::from_str(&buf)?;
if engine.se.is_none() {
return Err(Error::new(
ErrorKind::UnsupportedFormat,
"unsupoorted database format",
));
} else if engine.version > ENGINE_VERSION {
return Err(Error::new(ErrorKind::UnsupportedVersion, engine.version));
}
db_engine = Some(engine);
trace!("database opened successfully");
}
Err(ref e) if e.kind() == io::ErrorKind::NotFound => {
trace!("no database found, creating new");
if Path::new(&self.path).exists() {
return Err(Error::new(
ErrorKind::Other,
"the directory already exists but no meta file found",
));
}
let engine = Engine {
se: Some(self.default_fmt),
created: timestamp_ns!(),
version: ENGINE_VERSION,
checksums: self.default_checksums,
};
fs::create_dir_all(&self.path)?;
let mut fh = fs::File::create(&self.meta_path)?;
fh.write_all(serde_json::to_string(&engine)?.as_bytes())?;
if self.auto_flush {
fh.flush()?;
fh.sync_all()?;
}
db_engine = Some(engine);
trace!("database created successfully");
}
Err(e) => {
return Err(Error::new(ErrorKind::IOError, e));
}
}
if self.lock_ex {
let lock_path = Path::new(&self.lock_path);
if lock_path.exists() {
let lock_fh = fs::File::open(&self.lock_path)?;
match lock_ex(&lock_fh, self.timeout) {
Ok(v) => self.repair_recommended = v,
Err(e) => return Err(e),
};
}
let mut lock_fh = fs::File::create(&self.lock_path)?;
match lock_ex(&lock_fh, self.timeout) {
Ok(_) => {}
Err(e) => return Err(e),
};
lock_fh.write_all(process::id().to_string().as_bytes())?;
if self.auto_flush {
lock_fh.flush()?;
lock_fh.sync_all()?;
}
self.lock_fh = Some(lock_fh);
}
self.engine = db_engine;
if self.repair_recommended {
warn!("warning: database repair is recommended");
if self.auto_repair {
warn!("warning: starting auto-repair");
match self.repair() {
Ok(_) => warn!("auto-repair completed"),
Err(e) => error!("auto-repair failed {}", e),
};
}
} else {
trace!("the database is clean, no repairing recommended");
}
let _r = fs::create_dir_all(&self.trash_path);
let _r = fs::create_dir_all(&self.key_path);
if self.auto_flush {
sync_dir(&self.path)?;
}
Ok(ServerInfo::new())
}
pub fn close(&mut self) -> Result<(), Error> {
trace!("closing the database {}", self.path);
if self.engine.is_none() {
return Err(Error::new(
ErrorKind::NotOpened,
"the database is not opened",
));
}
self.engine = None;
if self.lock_fh.is_some() {
self.lock_fh = None;
fs::remove_file(&self.lock_path)?;
}
Ok(())
}
fn find_schema_key(&mut self, key: &str) -> Result<Option<String>, Error> {
if key.starts_with(".schema/") || key == ".schema" {
trace!("schema key for {} is virtual", key);
return Ok(Some("!JSON Schema Draft-7".to_owned()));
}
let mut schema_key = ".schema/".to_owned() + key;
loop {
if self.key_exists(&schema_key)? {
trace!("found Schema schema_key for {} at {}", key, schema_key);
return Ok(Some(schema_key));
}
match schema_key.rfind('/') {
Some(pos) => {
schema_key = schema_key[..pos].to_string();
}
None => {
break;
}
};
}
Ok(None)
}
fn validate_schema(&mut self, key: &str, value: &Value) -> Result<(), Error> {
trace!("validating schema for {}", key);
if key.starts_with(".schema/") || key == ".schema" {
JSONSchema::options()
.with_draft(Draft::Draft7)
.compile(value)?;
Ok(())
} else {
match self.find_schema_key(key)? {
Some(schema_key) => {
let schema = self.get_key_data(DataKey::Name(&schema_key), false)?.0;
let compiled = JSONSchema::options()
.with_draft(Draft::Draft7)
.compile(&schema)?;
compiled.validate(value).map_err(|e| {
let mut err: String = String::new();
for error in e {
if !err.is_empty() {
err += "\n";
}
err += error.to_string().as_str();
}
Error::new(ErrorKind::SchemaValidationError, err)
})
}
None => {
if self.strict_schema {
Err(Error::new(
ErrorKind::SchemaValidationError,
"schema not defined",
))
} else {
Ok(())
}
}
}
}
}
fn set_key_data(
&mut self,
key: &str,
value: Value,
stime: Option<u64>,
ignore_schema: bool,
) -> Result<(), Error> {
trace!("setting value for key {}", key);
let key = fmt_key(key);
if key.is_empty() {
return Err(Error::new(ErrorKind::KeyNotFound, key));
}
let engine = get_engine!(self);
let mut dts: Vec<String> = Vec::new();
if !ignore_schema {
self.validate_schema(&key, &value)?;
}
let key_file = self.key_path.clone() + "/" + key.as_str() + engine.get_suffix().as_str();
if self.write_modified_only {
match self.key_get(&key) {
Ok(v) => {
if v == value {
trace!("key {} not modified, skipping set", key);
return Ok(());
}
}
Err(_) => {
trace!("key {} not cached", key);
}
}
}
let key_dir = match key.rfind('/') {
Some(pos) => {
let key_dir = self.key_path.clone() + "/" + &key[0..pos];
let dirs = create_dirs(&self.key_path, &key[0..pos])?;
if self.auto_flush {
for dir in dirs {
let d = dir[..dir.rfind('/').unwrap()].to_string();
if !dts.contains(&d) {
dts.push(d);
}
}
}
key_dir
}
None => self.key_path.clone(),
};
let temp_file = self.key_path.clone() + "/" + key.as_str() + ".tmp";
let content = engine.se.unwrap().serialize(&value)?;
let mut hasher = Sha256::new();
hasher.update(&content);
let digest = hasher.finish();
let mut file = fs::File::create(&temp_file)?;
let is_binary = engine.is_serialization_binary();
if engine.checksums {
if is_binary {
file.write_all(&digest)?;
} else {
file.write_all(hex::encode(&digest).as_bytes())?;
file.write_all(&[0x0A_u8])?;
}
let stime = match stime {
Some(v) => v,
None => timestamp_ns!(),
};
if is_binary {
file.write_all(&stime.to_le_bytes())?;
} else {
file.write_all(hex::encode(&stime.to_le_bytes()).as_bytes())?;
file.write_all(&[0x0A_u8])?;
}
}
file.write_all(&content)?;
if self.auto_flush {
file.flush()?;
file.sync_all()?;
}
drop(file);
fs::rename(&temp_file, key_file)?;
if self.auto_flush && !dts.contains(&key_dir) {
dts.push(key_dir);
}
self.cache.pop(&key);
self.cache.put(key, value);
if self.auto_flush {
for dir in dts {
let _r = sync_dir(&dir);
}
}
Ok(())
}
fn purge_cache_by_path(&mut self, key: &str) {
let key = if key.ends_with('/') {
key.to_owned()
} else {
key.to_owned() + "/"
};
trace!("purging cache for {}*", key);
let to_remove: Vec<_> = self
.cache
.iter()
.filter(|&(k, _)| k.starts_with(&key))
.map(|(k, _)| k.clone())
.collect();
for k in to_remove {
self.cache.pop(&k);
}
}
fn _delete_key(
&mut self,
key: &str,
recursive: bool,
no_flush: bool,
dir_only: bool,
) -> Result<(), Error> {
trace!(
"deleting key: {}, recursive: {}, no_flush: {}, dir_only: {}",
key,
recursive,
no_flush,
dir_only
);
let engine = get_engine!(self);
let key = fmt_key(key);
if key.starts_with(".trash/") || key == ".trash" {
return Err(Error::new(
ErrorKind::KeyNotFound,
"Use purge to remove trashed items",
));
}
if key.is_empty() && !recursive {
return Ok(());
}
self.cache.pop(&key);
let mut dts: Vec<String> = Vec::new();
let dn = self.key_path.clone() + "/" + key.as_str();
if Path::new(&dn).is_dir() && recursive {
let trashed = format!(
"{}/{}.{}",
self.trash_path.clone(),
key.replace('/', "_"),
timestamp_ns!()
);
trace!("renaming dir {} to {}", &dn, &trashed);
fs::create_dir_all(&self.trash_path)?;
let mut options = fs_extra::dir::CopyOptions::new();
options.copy_inside = true;
options.overwrite = true;
fs_extra::dir::move_dir(&dn, &trashed, &options)?;
dts.push(dn);
self.purge_cache_by_path(&key);
}
let mut key_dir = match key.rfind('/') {
Some(n) => {
let key_path = &key[..n];
self.key_path.clone() + "/" + key_path
}
None => self.key_path.clone(),
};
if !dir_only && !key.is_empty() {
let key_file =
self.key_path.clone() + "/" + key.as_str() + engine.get_suffix().as_str();
let trashed = format!(
"{}/{}.{}{}",
self.trash_path.clone(),
key.replace('/', "_"),
timestamp_ns!(),
engine.get_suffix()
);
trace!("renaming file {} to {}", &key_file, &trashed);
fs::create_dir_all(&self.trash_path)?;
let mut options = fs_extra::file::CopyOptions::new();
options.overwrite = true;
let _r = fs_extra::file::move_file(&key_file, &trashed, &options);
if !dts.contains(&key_dir) {
dts.push(key_dir.clone());
}
}
loop {
if key_dir == self.key_path {
if self.auto_flush && !dts.contains(&key_dir) {
dts.push(key_dir);
}
break;
}
if fs::remove_dir(&key_dir).is_err() {
if self.auto_flush && !dts.contains(&key_dir) {
dts.push(key_dir);
}
break;
}
key_dir = key_dir[0..key_dir.rfind('/').unwrap()].to_owned();
}
if self.auto_flush && !no_flush {
for dir in dts {
let _r = sync_dir(&dir);
}
let _r = sync_dir(&self.trash_path);
}
Ok(())
}
fn list_subkeys(&self, key: &str, hidden: bool) -> Result<Vec<String>, Error> {
trace!("listing subkeys of {}, hidden: {}", key, hidden);
let engine = get_engine!(self);
let key = fmt_key(key);
let mut result: Vec<String> = Vec::new();
let mut pattern = self.key_path.clone();
let suffix = engine.get_suffix();
let suffix_len = suffix.len();
let path_len = self.key_path.len();
if !key.is_empty() {
pattern += "/";
pattern += key.as_str();
pattern += "/";
}
pattern += "/**/*";
pattern += suffix.as_str();
for entry in (glob(pattern.as_str())?).flatten() {
let k = entry.to_str().unwrap();
let key_name = &k[path_len + 1..k.len() - suffix_len];
if hidden
|| (!key_name.starts_with('.')
&& !key_name.contains("/.")
&& !RE_BAK.is_match(key_name))
{
result.push(key_name.to_owned());
}
}
Ok(result)
}
fn get_key_data(
&mut self,
key: DataKey,
extended_info: bool,
) -> Result<(Value, Option<KeyInfo>), Error> {
trace!("getting key {:?}, extended_info: {}", key, extended_info);
let engine = get_engine!(self);
if key.is_name() {
let key = fmt_key(key.get());
if key.is_empty() {
return Err(Error::new(ErrorKind::KeyNotFound, key));
} else if !extended_info {
if let Some(v) = self.cache.get(&key) {
trace!("using cached value for {}", key);
return Ok((v.clone(), None));
}
}
}
let is_binary = engine.is_serialization_binary();
let key_file = match key {
DataKey::File(v) => v.to_owned(),
DataKey::Name(v) => self.key_path.clone() + "/" + v + engine.get_suffix().as_str(),
};
let buf = match fs::read(&key_file) {
Ok(v) => v,
Err(ref e) if e.kind() == io::ErrorKind::NotFound => {
return Err(Error::new(ErrorKind::KeyNotFound, key.get()))
}
Err(e) => return Err(Error::new(ErrorKind::IOError, e)),
};
let checksum: Option<[u8; 32]>;
let stime: Option<u64>;
let value: Value = engine.se.unwrap().deserialize(if engine.checksums {
if (is_binary && buf.len() < 41) || (!is_binary && buf.len() < 83) {
return Err(Error::new(
ErrorKind::DataError,
format!("the key file is corrupted: {}", key_file),
));
}
let mut hasher = Sha256::new();
if is_binary {
hasher.update(&buf[40..buf.len()]);
} else {
hasher.update(&buf[82..buf.len()]);
}
let digest = hasher.finish();
if (is_binary && digest != buf[0..32])
|| (!is_binary && digest != *hex::decode(&buf[0..64])?.as_slice())
{
return Err(Error::new(
ErrorKind::DataError,
format!("checksum does not match: {}", key_file),
));
}
checksum = Some(digest);
if is_binary {
stime = Some(u64::from_le_bytes([
buf[32], buf[33], buf[34], buf[35], buf[36], buf[37], buf[38], buf[39],
]));
&buf[40..buf.len()]
} else {
let s = hex::decode(&buf[65..81])?;
stime = Some(u64::from_le_bytes([
s[0], s[1], s[2], s[3], s[4], s[5], s[6], s[7],
]));
&buf[82..buf.len()]
}
} else {
checksum = None;
stime = None;
&buf
})?;
self.cache.put(key.get().to_owned(), value.clone());
Ok((
value,
if extended_info {
let metadata = fs::metadata(&key_file)?;
Some(KeyInfo {
key_file,
checksum,
metadata,
stime,
})
} else {
None
},
))
}
fn list_key_and_subkeys(&mut self, key: &str, hidden: bool) -> Result<Vec<String>, Error> {
let mut result = match self.list_subkeys(key, hidden) {
Ok(v) => v,
Err(e) => return Err(e),
};
match self.key_exists(key) {
Ok(v) => {
if v {
result.push(key.to_owned());
}
}
Err(e) => return Err(e),
};
Ok(result)
}
#[allow(clippy::case_sensitive_file_extension_comparisons)]
fn _purge(&mut self, keep_broken: bool) -> Result<Vec<String>, Error> {
trace!("purge requested, keep_broken: {}", keep_broken);
let mut result: Vec<String> = Vec::new();
let engine = get_engine!(self);
let mut dts: Vec<String> = Vec::new();
let path_len = self.key_path.len();
let suffix = engine.get_suffix();
let suffix_len = suffix.len();
trace!("cleaning up trash");
fs::remove_dir_all(&self.trash_path)?;
trace!("cleaning up files and broken keys");
for entry in glob(&(self.key_path.clone() + "/**/*"))? {
match entry {
Ok(p) => {
let k = p.to_str().unwrap();
let mut need_remove = false;
if !p.is_dir() && k != self.trash_path {
if k.ends_with(&suffix) {
if !keep_broken {
let key_name = k[path_len + 1..k.len() - suffix_len].to_owned();
if self.get_key_data(DataKey::Name(&key_name), false).is_err() {
result.push(key_name);
need_remove = true;
}
}
} else if !keep_broken || !k.ends_with(".tmp") {
need_remove = true;
}
}
if need_remove {
fs::remove_file(k)?;
if self.auto_flush {
let parent = p.parent().unwrap().to_str().unwrap().to_owned();
if !dts.contains(&parent) {
dts.push(parent);
}
}
}
}
Err(e) => {
error!("error while browsing db directory: {}", e);
}
}
}
trace!("cleaning up directories");
let mut dirs: Vec<String> = Vec::new();
for entry in glob(&(self.key_path.clone() + "/**"))? {
match entry {
Ok(p) => {
if p.is_dir() {
dirs.push(p.to_str().unwrap().to_string());
}
}
Err(e) => {
return Err(Error::new(ErrorKind::IOError, e));
}
}
}
dirs.sort();
dirs.reverse();
for d in dirs {
if fs::remove_dir(&d).is_ok() {
let parent = d[..d.rfind('/').unwrap()].to_string();
if self.auto_flush && !dts.contains(&parent) {
dts.push(parent);
}
}
}
if self.auto_flush {
for dir in dts {
let _r = sync_dir(&dir);
}
}
fs::create_dir_all(&self.trash_path)?;
self.cache.clear();
sync_dir(&self.key_path)?;
trace!("purge completed");
Ok(result)
}
pub fn key_load_from_serialized(&mut self, data: &[Value]) -> Result<(), Error> {
for d in data {
match d {
Value::Array(v) => {
let key: String = match v[0].as_str() {
Some(v) => v.to_string(),
None => {
return Err(Error::new(
ErrorKind::DataError,
format!("Invalid key name '{:?}'", v),
))
}
};
self.set_key_data(&key, v[1].clone(), None, true)?;
}
_ => {
return Err(Error::new(
ErrorKind::DataError,
format!("Invalid record '{}'", d),
))
}
}
}
Ok(())
}
pub fn info(&self) -> Result<DBInfo, Error> {
let engine = get_engine!(self);
Ok(DBInfo {
repair_recommended: self.repair_recommended,
auto_flush: self.auto_flush,
cached_keys: self.cache.len(),
cache_size: self.cache.cap(),
auto_bak: self.auto_bak,
strict_schema: self.strict_schema,
path: self.path.clone(),
lock_path: self.lock_path.clone(),
server: (SERVER_ID.to_owned(), VERSION.to_owned()),
fmt: engine.se.unwrap().to_string(),
checksums: engine.checksums,
created: engine.created,
version: engine.version,
})
}
#[allow(clippy::cast_possible_truncation)]
pub fn server_set(&mut self, name: &str, value: Value) -> Result<(), Error> {
trace!("setting server option {}={}", name, value);
macro_rules! invalid_server_option_value {
($n:expr, $value: expr) => {
return Err(Error::new(
ErrorKind::DataError,
format!("Invalid server option value {}={}", $n, $value),
))
};
}
match name {
"auto_flush" => match value {
Value::Bool(v) => self.auto_flush = v,
_ => invalid_server_option_value!(name, &value),
},
"auto_bak" => match value.as_u64() {
Some(v) => self.auto_bak = v,
_ => invalid_server_option_value!(name, &value),
},
"strict_schema" => match value {
Value::Bool(v) => self.strict_schema = v,
_ => invalid_server_option_value!(name, &value),
},
"repair_recommended" => match value {
Value::Bool(v) => self.repair_recommended = v,
_ => invalid_server_option_value!(name, &value),
},
"cache_size" => match &value {
Value::Number(v) => {
let size: u64 = match v.as_u64() {
Some(v) => v,
None => invalid_server_option_value!(name, &value),
};
self.cache.resize(size as usize);
}
_ => invalid_server_option_value!(name, &value),
},
_ => {
return Err(Error::new(
ErrorKind::DataError,
format!("Invalid server option {}", name),
))
}
}
Ok(())
}
pub fn purge(&mut self) -> Result<Vec<String>, Error> {
self._purge(false)
}
pub fn safe_purge(&mut self) -> Result<(), Error> {
match self._purge(true) {
Ok(_) => Ok(()),
Err(e) => Err(e),
}
}
pub fn purge_cache(&mut self) -> Result<(), Error> {
self.cache.clear();
Ok(())
}
pub fn key_delete(&mut self, key: &str) -> Result<(), Error> {
if self.need_backup(key) {
for n in 1..=self.auto_bak {
let key_name = format!("{}.bak{}", key, n);
if let Err(e) = self._delete_key(&key_name, false, false, false) {
if e.kind() != ErrorKind::KeyNotFound {
return Err(e);
}
}
}
}
self._delete_key(key, false, false, false)
}
pub fn key_delete_recursive(&mut self, key: &str) -> Result<(), Error> {
self._delete_key(key, true, false, false)
}
fn key_exists(&mut self, key: &str) -> Result<bool, Error> {
let key = fmt_key(key);
if key.is_empty() {
return Ok(false);
}
let engine = get_engine!(self);
if self.cache.contains(&key) {
Ok(true)
} else {
let key_file =
self.key_path.clone() + "/" + key.as_str() + engine.get_suffix().as_str();
Ok(Path::new(&key_file).exists())
}
}
#[allow(clippy::cast_possible_truncation)]
pub fn key_explain(&mut self, key: &str) -> Result<KeyExplained, Error> {
let v = match self.get_key_data(DataKey::Name(key), true) {
Ok(v) => v,
Err(e) => return Err(e),
};
let value = v.0;
let info = v.1.unwrap();
let value_len = value.get_len();
let value_type = value.get_type();
Ok(KeyExplained {
value,
schema: self.find_schema_key(key)?,
len: value_len,
tp: value_type,
mtime: info
.metadata
.modified()?
.duration_since(SystemTime::UNIX_EPOCH)?
.as_nanos() as u64,
sha256: info.checksum,
stime: info.stime,
size: info.metadata.len(),
file: info.key_file,
})
}
pub fn key_get(&mut self, key: &str) -> Result<Value, Error> {
Ok(self.get_key_data(DataKey::Name(key), false)?.0)
}
pub fn key_get_recursive(&mut self, key: &str) -> Result<Vec<(String, Value)>, Error> {
let mut result = Vec::new();
for key in self.list_key_and_subkeys(key, false)? {
let value = self.get_key_data(DataKey::Name(&key), false)?.0;
result.push((key, value));
}
Ok(result)
}
pub fn key_get_field(&mut self, key: &str, field: &str) -> Result<Value, Error> {
let mut data: &Value = &self.key_get(key)?;
for f in field.split('/') {
match data {
Value::Object(v) => match v.get(f) {
Some(value) => data = value,
None => {
return Err(Error::new(
ErrorKind::FieldNotFound,
format!("no such field '{}'", field),
));
}
},
_ => return Err(Error::new(ErrorKind::DataError, "field is not an object")),
}
}
Ok(data.clone())
}
pub fn key_set_field(&mut self, key: &str, field: &str, value: Value) -> Result<(), Error> {
let mut key_data: Value = match self.key_get(key) {
Ok(v) => v,
Err(e) if e.kind() == ErrorKind::KeyNotFound => Value::from(serde_json::Map::new()),
Err(e) => return Err(e),
};
let mut data_ptr = &mut key_data;
let fname = match field.rfind('/') {
Some(pos) => {
for f in field[..pos].split('/') {
match data_ptr.as_object_mut() {
Some(v) => {
if !v.contains_key(f) {
let m = serde_json::Map::new();
let d = Value::from(m);
v.insert(f.to_owned(), d);
}
data_ptr = if let Some(v) = v.get_mut(f) {
v
} else {
return Err(Error::new(
ErrorKind::DataError,
"unable to get field",
));
}
}
_ => {
return Err(Error::new(ErrorKind::DataError, "field is not an object"))
}
}
}
&field[pos + 1..]
}
None => field,
};
match data_ptr.as_object_mut() {
Some(o) => {
let oldval = o.get(fname);
if oldval.is_none() || oldval.unwrap() != &value || !self.write_modified_only {
o.insert(fname.to_owned(), value);
self.cache.pop(&fmt_key(key));
self.key_set(key, key_data)?;
}
Ok(())
}
None => Err(Error::new(ErrorKind::DataError, "field is not object")),
}
}
pub fn key_delete_field(&mut self, key: &str, field: &str) -> Result<(), Error> {
let mut key_data: Value = match self.key_get(key) {
Ok(v) => v,
Err(e) if e.kind() == ErrorKind::KeyNotFound => Value::from(serde_json::Map::new()),
Err(e) => return Err(e),
};
let mut data_ptr = &mut key_data;
let fname = match field.rfind('/') {
Some(pos) => {
for f in field[..pos].split('/') {
match data_ptr.as_object_mut() {
Some(v) => {
if !v.contains_key(f) {
return Ok(());
}
data_ptr = if let Some(v) = v.get_mut(f) {
v
} else {
return Err(Error::new(
ErrorKind::DataError,
"unable to get field",
));
}
}
_ => {
return Err(Error::new(ErrorKind::DataError, "field is not an object"))
}
}
}
&field[pos + 1..]
}
None => field,
};
match data_ptr.as_object_mut() {
Some(o) => {
if o.contains_key(fname) || !self.write_modified_only {
o.remove(fname);
self.cache.pop(&fmt_key(key));
self.key_set(key, key_data)?;
}
Ok(())
}
None => Err(Error::new(ErrorKind::DataError, "field is not object")),
}
}
pub fn key_set(&mut self, key: &str, value: Value) -> Result<(), Error> {
self.validate_schema(key, &value)?;
if self.need_backup(key) {
for n in (1..=self.auto_bak).rev() {
let key_from = if n == 1 {
key.to_owned()
} else {
format!("{}.bak{}", key, n - 1)
};
let key_to = format!("{}.bak{}", key, n);
match self._rename(&key_from, &key_to, false, true) {
Ok(_) => {}
Err(e) if e.kind() == ErrorKind::KeyNotFound => {}
Err(e) => return Err(e),
}
}
}
self.set_key_data(key, value, None, true)
}
pub fn key_list(&mut self, key: &str) -> Result<Vec<String>, Error> {
self.list_key_and_subkeys(key, false)
}
pub fn key_list_all(&mut self, key: &str) -> Result<Vec<String>, Error> {
self.list_key_and_subkeys(key, true)
}
pub fn key_copy(&mut self, key: &str, dst_key: &str) -> Result<(), Error> {
trace!("copying key {} to {}", key, dst_key);
let value = self.get_key_data(DataKey::Name(key), false)?.0;
self.set_key_data(dst_key, value, None, false)
}
pub fn key_increment(&mut self, key: &str) -> Result<i64, Error> {
trace!("incrementing key {}", key);
let key = fmt_key(key);
let mut value = match self.get_key_data(DataKey::Name(&key), false) {
Ok(v) => match v.0.as_i64() {
Some(n) => n,
None => return Err(Error::new(ErrorKind::DataError, "Unable to increment key")),
},
Err(ref e) if e.kind() == ErrorKind::KeyNotFound => 0_i64,
Err(e) => return Err(e),
};
if value == std::i64::MAX {
return Err(Error::new(ErrorKind::DataError, "counter overflow"));
}
value += 1;
match self.set_key_data(&key, Value::from(value), None, false) {
Ok(_) => Ok(value),
Err(e) => Err(e),
}
}
pub fn key_decrement(&mut self, key: &str) -> Result<i64, Error> {
trace!("decrementing key {}", key);
let key = fmt_key(key);
let mut value = match self.get_key_data(DataKey::Name(&key), false) {
Ok(v) => match v.0.as_i64() {
Some(n) => n,
None => return Err(Error::new(ErrorKind::DataError, "Unable to decrement key")),
},
Err(ref e) if e.kind() == ErrorKind::KeyNotFound => 0_i64,
Err(e) => return Err(e),
};
if value == std::i64::MIN {
return Err(Error::new(ErrorKind::DataError, "counter overflow"));
}
value -= 1;
match self.set_key_data(&key, Value::from(value), None, false) {
Ok(_) => Ok(value),
Err(e) => Err(e),
}
}
pub fn key_rename(&mut self, key: &str, dst_key: &str) -> Result<(), Error> {
self._rename(key, dst_key, true, false)
}
fn _rename(
&mut self,
key: &str,
dst_key: &str,
flush: bool,
key_only: bool,
) -> Result<(), Error> {
trace!("renaming key {} to {}", key, dst_key);
let engine = get_engine!(self);
let mut dts: Vec<String> = Vec::new();
let key = fmt_key(key);
let dst_key = fmt_key(dst_key);
if key.is_empty() || dst_key.is_empty() {
return Err(Error::new(ErrorKind::KeyNotFound, key));
}
let pos = dst_key.rfind('/');
let dst_key_path = match pos {
Some(p) => &dst_key[..p],
None => "",
};
let dst_key_dir = self.key_path.clone() + "/" + dst_key_path;
let dirs = create_dirs(&self.key_path, dst_key_path)?;
if self.auto_flush && flush {
for dir in dirs {
let d = dir[..dir.rfind('/').unwrap()].to_string();
if !dts.contains(&d) {
dts.push(d);
}
}
}
let mut renamed = false;
let key_file = self.key_path.clone() + "/" + key.as_str() + engine.get_suffix().as_str();
let dst_key_file =
self.key_path.clone() + "/" + dst_key.as_str() + engine.get_suffix().as_str();
let mut options = fs_extra::file::CopyOptions::new();
options.overwrite = true;
match fs_extra::file::move_file(&key_file, &dst_key_file, &options) {
Ok(_) => {
renamed = true;
if let Some(v) = self.cache.pop(&key) {
self.cache.put(dst_key.clone(), v);
}
if self.auto_flush && flush {
let d1 = key_file[..key_file.rfind('/').unwrap()].to_string();
if !dts.contains(&d1) {
dts.push(d1);
}
if !dts.contains(&dst_key_dir) {
dts.push(dst_key_dir);
}
}
}
Err(e) => match e.kind {
fs_extra::error::ErrorKind::NotFound => {}
_ => return Err(Error::new(ErrorKind::IOError, e)),
},
};
if !key_only {
let dir_name = self.key_path.clone() + "/" + key.as_str();
let dst_dir_name = self.key_path.clone() + "/" + dst_key.as_str();
let mut options = fs_extra::dir::CopyOptions::new();
options.copy_inside = true;
options.overwrite = true;
match fs_extra::dir::move_dir(&dir_name, &dst_dir_name, &options) {
Ok(_) => {
renamed = true;
self.purge_cache_by_path(&dir_name);
if self.auto_flush && flush {
let d1 = dir_name[..dir_name.rfind('/').unwrap()].to_string();
let d2 = dst_dir_name[..dst_dir_name.rfind('/').unwrap()].to_string();
if !dts.contains(&d1) {
dts.push(d1);
}
if !dts.contains(&d2) {
dts.push(d2);
}
}
}
Err(e) => match e.kind {
fs_extra::error::ErrorKind::NotFound => {}
fs_extra::error::ErrorKind::Io(e)
if e.kind() == io::ErrorKind::InvalidInput => {}
_ => return Err(Error::new(ErrorKind::IOError, e)),
},
};
}
if self.auto_flush && flush {
for dir in dts {
let _r = sync_dir(&dir);
}
}
if renamed {
self._delete_key(&key, false, false, true)?;
Ok(())
} else {
Err(Error::new(ErrorKind::KeyNotFound, key))
}
}
#[allow(clippy::case_sensitive_file_extension_comparisons)]
pub fn check(&mut self) -> Result<Vec<String>, Error> {
trace!("checking the database");
let engine = get_engine!(self);
let mut broken: Vec<String> = Vec::new();
let suffix = engine.get_suffix();
let path_len = self.key_path.len();
for entry in glob(&(self.key_path.clone() + "/**/*"))? {
match entry {
Ok(p) => {
let key_file = p.to_str().unwrap().to_string();
if key_file.ends_with(&suffix) {
if self.get_key_data(DataKey::File(&key_file), false).is_err() {
let key =
key_file[path_len + 1..key_file.rfind('.').unwrap()].to_string();
trace!("broken key found: {}", key);
broken.push(key);
}
} else if key_file.ends_with(".tmp") {
let key = key_file[path_len + 1..key_file.rfind('.').unwrap()].to_string();
trace!("lost key found: {}", key);
broken.push(key);
}
}
Err(e) => {
error!("error while browsing db directory: {}", e);
}
}
}
Ok(broken)
}
pub fn repair(&mut self) -> Result<Vec<(String, bool)>, Error> {
trace!("repairing the database");
let engine = get_engine!(self);
let mut result: Vec<(String, bool)> = Vec::new();
let mut dts: Vec<String> = Vec::new();
let suffix = engine.get_suffix();
let path_len = self.key_path.len();
self.cache.clear();
for entry in glob(&(self.key_path.clone() + "/**/*.tmp"))? {
match entry {
Ok(p) => {
let key_file = p.to_str().unwrap().to_string();
if self.get_key_data(DataKey::File(&key_file), false).is_ok() {
fs::rename(
&key_file,
&(key_file[..key_file.rfind('.').unwrap()].to_string() + &suffix),
)?;
let key = key_file[path_len + 1..key_file.rfind('.').unwrap()].to_string();
trace!("recovered lost key {}", key);
result.push((key, true));
} else {
fs::remove_file(&key_file)?;
let key = key_file[path_len + 1..key_file.rfind('.').unwrap()].to_string();
trace!("deleted broken key {}", key);
result.push((key, false));
}
if self.auto_flush {
let parent = key_file[..key_file.rfind('/').unwrap()].to_owned();
if !dts.contains(&parent) {
dts.push(parent);
}
}
}
Err(e) => {
error!("error while browsing db directory: {}", e);
}
}
}
if self.auto_flush {
for dir in dts {
let _r = sync_dir(&dir);
}
}
for key in self._purge(false)? {
result.push((key, false));
}
trace!("repair completed");
self.repair_recommended = false;
Ok(result)
}
pub fn key_dump(&mut self, key: &str) -> Result<Vec<(String, Value)>, Error> {
trace!("dump requested for {}", key);
let mut result = Vec::new();
for key in self.list_key_and_subkeys(key, true)? {
if let Ok(v) = self.get_key_data(DataKey::Name(&key), false) {
trace!("dumped key {}", key);
result.push((key, v.0));
}
}
Ok(result)
}
pub fn key_load(&mut self, data: Vec<(String, Value)>) -> Result<(), Error> {
trace!("key load requested");
for d in data {
trace!("loading key {}", d.0);
self.set_key_data(&d.0, d.1, None, true)?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
#[test]
fn test_db() {
use super::*;
use serde_json::map::Map;
use serde_json::Value;
use std::fs;
let db_path = "/tmp/yedb-test-db";
for checksums in vec![false, true] {
for db_format in vec!["json", "yaml", "msgpack", "cbor"] {
let _ = fs::remove_dir_all(&db_path);
let mut db = Database::new();
db.set_db_path(&db_path).unwrap();
db.set_default_fmt(&db_format, checksums).unwrap();
db.set_cache_size(100);
db.open().unwrap();
let i = db.info().unwrap();
assert_eq!(i.repair_recommended, false);
assert_eq!(i.auto_flush, true);
assert_eq!(i.cached_keys, 0);
assert_eq!(i.cache_size, 100);
assert_eq!(i.path, db_path);
assert_eq!(i.fmt, db_format);
assert_eq!(i.checksums, checksums);
db.server_set("auto_flush", Value::from(false)).unwrap();
db.server_set("repair_recommended", Value::from(true))
.unwrap();
db.server_set("cache_size", Value::from(1000)).unwrap();
let i = db.info().unwrap();
assert_eq!(i.repair_recommended, true);
assert_eq!(i.auto_flush, false);
assert_eq!(i.cache_size, 1000);
db.purge().unwrap();
db.safe_purge().unwrap();
db.key_set("test", Value::from(123)).unwrap();
db.key_set("x/y/z", Value::from("test")).unwrap();
db.key_set(".a", Value::from("z")).unwrap();
assert_eq!(db.key_list("/").unwrap(), vec!["test", "x/y/z"]);
assert_eq!(db.key_list_all("/").unwrap(), vec![".a", "test", "x/y/z"]);
assert_eq!(db.key_get("test").unwrap().as_u64().unwrap(), 123);
let i = db.info().unwrap();
assert_eq!(i.cached_keys, 3);
db.purge_cache().unwrap();
let i = db.info().unwrap();
assert_eq!(i.cached_keys, 0);
db.key_delete("test").unwrap();
assert_eq!(db.key_get("test").is_err(), true);
db.key_rename("x/y/z", "x/y/a").unwrap();
assert_eq!(db.key_get("x/y/z").is_err(), true);
assert_eq!(db.key_get("x/y/a").unwrap().as_str().unwrap(), "test");
db.key_delete_recursive("x/y").unwrap();
assert_eq!(db.key_get("x/y/a").is_err(), true);
db.key_copy(".a", "a/b/c").unwrap();
assert_eq!(db.key_get("a/b/c").unwrap().as_str().unwrap(), "z");
let mut schema = Map::new();
schema.insert("type".to_owned(), Value::from("number"));
db.key_set(".schema/n", Value::from(schema)).unwrap();
assert_eq!(db.key_set("n/x", Value::from("test")).is_err(), true);
db.key_set("n/x", Value::from(123)).unwrap();
let ki = db.key_explain("n/x").unwrap();
assert_eq!(ki.value, Value::from(123));
assert_eq!(ki.schema, Some(".schema/n".to_owned()));
assert_eq!(ki.len, None);
assert_eq!(ki.tp, "number");
assert_eq!(ki.sha256.is_some(), checksums);
let se = SerializationEngine::from_string(db_format).unwrap();
let key_path = db_path.to_owned() + "/keys/n/x" + se.suffix(checksums).as_str();
assert_eq!(ki.file, key_path);
db.close().unwrap();
}
}
let _ = fs::remove_dir_all(&db_path);
}
}