#![allow(clippy::type_complexity)]
use std::hash::Hash;
use std::sync::Arc;
use std::time::Duration;
use std::{collections::BTreeMap, path::PathBuf};
use armour_core::persist::Persist;
use parking_lot::Mutex;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use crate::compaction::Compactor;
use crate::hook::TypedWriteHook;
use crate::shutdown::ShutdownSignal;
use crate::{Codec, CollectionMeta, Config, DbResult, FixedConfig, Key, TreeMeta};
use crate::{TypedMap, TypedTree, ZeroMap, ZeroTree};
use super::collection::Collection;
use super::migration::TypedMigration;
use super::seq::SeqGen;
#[derive(Serialize, Deserialize, Default, Debug, Clone)]
pub struct DbInfo {
#[serde(default)]
pub version: u32,
pub collections: BTreeMap<String, CollectionInfo>,
}
#[derive(Serialize, Deserialize, Default, Debug, Clone, Copy)]
pub struct CollectionInfo {
pub version: u16,
pub typ_hash: u64,
#[serde(default)]
pub seq: u64,
}
#[derive(Serialize, Deserialize, Default, Debug, Clone)]
struct UserMeta {
db: Option<serde_json::Value>,
collections: BTreeMap<String, serde_json::Value>,
}
const DEFAULT_COMPACTION_INTERVAL: Duration = Duration::from_secs(60);
pub struct Db {
pub path: PathBuf,
pub db_info: Persist<DbInfo>,
user_meta: Persist<UserMeta>,
pub seq: Arc<SeqGen>,
pub(crate) shutdown: ShutdownSignal,
collections: Arc<Mutex<Vec<Arc<dyn Collection>>>>,
#[cfg(feature = "rpc")]
handlers: Arc<Mutex<Vec<(u64, Arc<dyn super::handler::RpcHandler>)>>>,
#[cfg(feature = "rpc")]
pub(crate) rpc_handles: Mutex<Vec<super::rpc::RpcHandle>>,
compactor: Option<Compactor>,
}
impl Db {
pub fn open(path: impl AsRef<std::path::Path>) -> DbResult<Self> {
Self::open_with_compaction(path, Some(DEFAULT_COMPACTION_INTERVAL))
}
pub fn open_with_compaction(
path: impl AsRef<std::path::Path>,
compaction_interval: Option<Duration>,
) -> DbResult<Self> {
let path = path.as_ref().to_path_buf();
std::fs::create_dir_all(&path).map_err(crate::DbError::Io)?;
let db_info = Persist::open(path.join("db.info"));
let user_meta = Persist::open(path.join("db.user"));
let seq = SeqGen::open(path.join("__seq"))?;
let collections: Arc<Mutex<Vec<Arc<dyn Collection>>>> = Arc::new(Mutex::new(Vec::new()));
let shutdown = ShutdownSignal::new();
let compactor = compaction_interval.map(|interval| {
let cols = collections.clone();
Compactor::start_with_signal(
move || {
let snapshot = cols.lock().clone();
let mut total = 0;
for c in &snapshot {
total += c.compact()?;
}
Ok(total)
},
interval,
shutdown.clone(),
)
});
Ok(Self {
path,
db_info,
user_meta,
seq,
shutdown,
collections,
#[cfg(feature = "rpc")]
handlers: Arc::new(Mutex::new(Vec::new())),
#[cfg(feature = "rpc")]
rpc_handles: Mutex::new(Vec::new()),
compactor,
})
}
pub fn version(&self) -> u32 {
self.db_info.cloned().version
}
pub fn set_version(&self, f: impl FnOnce(u32) -> u32) -> u32 {
let mut new_version = 0;
self.db_info.update(|info| {
new_version = f(info.version);
info.version = new_version;
});
new_version
}
pub fn metadata<M: DeserializeOwned>(&self) -> Option<M> {
let um = self.user_meta.cloned();
um.db.and_then(|v| serde_json::from_value(v).ok())
}
pub fn set_metadata<M: Serialize + DeserializeOwned>(
&self,
f: impl FnOnce(Option<M>) -> M,
) -> M {
let mut result = None;
self.user_meta.update(|um| {
let current = um
.db
.as_ref()
.and_then(|v| serde_json::from_value(v.clone()).ok());
let new_val = f(current);
um.db = Some(serde_json::to_value(&new_val).expect("serialize metadata"));
result = Some(new_val);
});
result.expect("update executed")
}
pub fn collection_metadata<M: DeserializeOwned>(&self, name: &str) -> Option<M> {
let um = self.user_meta.cloned();
um.collections
.get(name)
.and_then(|v| serde_json::from_value(v.clone()).ok())
}
pub fn set_collection_metadata<M: Serialize + DeserializeOwned>(
&self,
name: &str,
f: impl FnOnce(Option<M>) -> M,
) -> M {
let mut result = None;
self.user_meta.update(|um| {
let current = um
.collections
.get(name)
.and_then(|v| serde_json::from_value(v.clone()).ok());
let new_val = f(current);
um.collections.insert(
name.to_owned(),
serde_json::to_value(&new_val).expect("serialize metadata"),
);
result = Some(new_val);
});
result.expect("update executed")
}
pub fn tree_path(&self, name: &str, version: u16) -> PathBuf {
self.path.join(format!("{name}:v{version}"))
}
pub fn next_id(&self, name: &str) -> DbResult<u64> {
self.seq.next_id(name)
}
pub fn collection_len<T: CollectionMeta>(&self) -> u64 {
self.stored_info(T::NAME).seq
}
pub fn close(&self) -> DbResult<()> {
for c in self.collections.lock().iter() {
self.save_collection_len(c.name(), c.len() as u64);
}
self.seq.flush()
}
pub fn shutdown(&mut self) -> DbResult<()> {
self.shutdown.shutdown();
#[cfg(feature = "rpc")]
for h in self.rpc_handles.get_mut().drain(..) {
drop(h); }
if let Some(ref mut c) = self.compactor {
c.stop();
}
self.close()
}
pub fn shutdown_signal(&self) -> ShutdownSignal {
self.shutdown.clone()
}
pub fn compact(&self) -> DbResult<usize> {
let snapshot = self.collections.lock().clone();
let mut total = 0;
for c in &snapshot {
total += c.compact()?;
}
Ok(total)
}
pub fn open_typed_tree<T, C, H>(
&self,
config: Config,
hook: H,
migrations: &[TypedMigration<T::SelfId, T>],
) -> DbResult<Arc<TypedTree<T::SelfId, T, C, H>>>
where
T: CollectionMeta + Clone + Send + Sync + 'static,
T::SelfId: Key + Ord + Send + Sync,
C: Codec<T> + Default + 'static,
H: TypedWriteHook<T::SelfId, T> + 'static,
{
let meta = TreeMeta::of::<T>();
let stored = self.stored_info(meta.name);
let tree = TypedTree::open_hooked(
self.tree_path(meta.name, meta.version),
config,
C::default(),
hook,
)?;
let migrated = self.run_migration(&meta, &stored, migrations, |mfn| tree.migrate(mfn))?;
if !migrated {
tree.replay_init();
}
self.save_info(&meta, tree.len() as u64);
let tree = Arc::new(tree);
self.collections.lock().push(tree.clone());
#[cfg(feature = "rpc")]
self.register_typed_tree_handler::<T, C, H>(&meta, tree.clone());
Ok(tree)
}
pub fn open_typed_map<T, C, H>(
&self,
config: Config,
hook: H,
migrations: &[TypedMigration<T::SelfId, T>],
) -> DbResult<Arc<TypedMap<T::SelfId, T, C, H>>>
where
T: CollectionMeta + Clone + Send + Sync + 'static,
T::SelfId: Key + Send + Sync + Hash + Eq,
C: Codec<T> + Default + 'static,
H: TypedWriteHook<T::SelfId, T> + 'static,
{
let meta = TreeMeta::of::<T>();
let stored = self.stored_info(meta.name);
let map = TypedMap::open_hooked(
self.tree_path(meta.name, meta.version),
config,
C::default(),
hook,
)?;
let migrated = self.run_migration(&meta, &stored, migrations, |mfn| map.migrate(mfn))?;
if !migrated {
map.replay_init();
}
self.save_info(&meta, map.len() as u64);
let map = Arc::new(map);
self.collections.lock().push(map.clone());
#[cfg(feature = "rpc")]
self.register_typed_map_handler::<T, C, H>(&meta, map.clone());
Ok(map)
}
pub fn open_zero_tree<T, const V: usize, H>(
&self,
config: Config,
hook: H,
migrations: &[TypedMigration<T::SelfId, T>],
) -> DbResult<Arc<ZeroTree<T::SelfId, V, T, H, crate::durability::Bitcask>>>
where
T: CollectionMeta + Copy + Send + Sync + 'static,
T::SelfId: Key + Ord + Send + Sync,
H: TypedWriteHook<T::SelfId, T> + 'static,
{
self.open_zero_tree_with(
|path, hook| ZeroTree::open_hooked(path, config, hook),
hook,
migrations,
)
}
fn open_zero_tree_with<T, const V: usize, H, D>(
&self,
open_fn: impl FnOnce(PathBuf, H) -> DbResult<ZeroTree<T::SelfId, V, T, H, D>>,
hook: H,
migrations: &[TypedMigration<T::SelfId, T>],
) -> DbResult<Arc<ZeroTree<T::SelfId, V, T, H, D>>>
where
T: CollectionMeta + Copy + Send + Sync + 'static,
T::SelfId: Key + Ord + Send + Sync,
H: TypedWriteHook<T::SelfId, T> + 'static,
D: crate::durability::Durability + 'static,
ZeroTree<T::SelfId, V, T, H, D>: Collection,
{
let meta = TreeMeta::of::<T>();
let stored = self.stored_info(meta.name);
let tree = open_fn(self.tree_path(meta.name, meta.version), hook)?;
let migrated = self.run_migration(&meta, &stored, migrations, |mfn| tree.migrate(mfn))?;
if !migrated {
tree.replay_init();
}
self.save_info(&meta, tree.len() as u64);
let tree = Arc::new(tree);
self.collections.lock().push(tree.clone());
#[cfg(feature = "rpc")]
self.register_zero_tree_handler::<T, V, H, D>(&meta, tree.clone());
Ok(tree)
}
pub fn open_zero_tree_fixed<T, const V: usize, H>(
&self,
config: FixedConfig,
hook: H,
migrations: &[TypedMigration<T::SelfId, T>],
) -> DbResult<Arc<ZeroTree<T::SelfId, V, T, H, crate::durability::Fixed>>>
where
T: CollectionMeta + Copy + Send + Sync + 'static,
T::SelfId: Key + Ord + Send + Sync,
H: TypedWriteHook<T::SelfId, T> + 'static,
{
self.open_zero_tree_with(
|path, hook| ZeroTree::open_with_hook(path, config, hook),
hook,
migrations,
)
}
pub fn open_zero_map<T, const V: usize, H>(
&self,
config: Config,
hook: H,
migrations: &[TypedMigration<T::SelfId, T>],
) -> DbResult<Arc<ZeroMap<T::SelfId, V, T, H, crate::durability::Bitcask>>>
where
T: CollectionMeta + Copy + Send + Sync + 'static,
T::SelfId: Key + Send + Sync + Hash + Eq,
H: TypedWriteHook<T::SelfId, T> + 'static,
{
self.open_zero_map_with(
|path, hook| ZeroMap::open_hooked(path, config, hook),
hook,
migrations,
)
}
fn open_zero_map_with<T, const V: usize, H, D>(
&self,
open_fn: impl FnOnce(PathBuf, H) -> DbResult<ZeroMap<T::SelfId, V, T, H, D>>,
hook: H,
migrations: &[TypedMigration<T::SelfId, T>],
) -> DbResult<Arc<ZeroMap<T::SelfId, V, T, H, D>>>
where
T: CollectionMeta + Copy + Send + Sync + 'static,
T::SelfId: Key + Send + Sync + Hash + Eq,
H: TypedWriteHook<T::SelfId, T> + 'static,
D: crate::durability::Durability + 'static,
ZeroMap<T::SelfId, V, T, H, D>: Collection,
{
let meta = TreeMeta::of::<T>();
let stored = self.stored_info(meta.name);
let map = open_fn(self.tree_path(meta.name, meta.version), hook)?;
let migrated = self.run_migration(&meta, &stored, migrations, |mfn| map.migrate(mfn))?;
if !migrated {
map.replay_init();
}
self.save_info(&meta, map.len() as u64);
let map = Arc::new(map);
self.collections.lock().push(map.clone());
#[cfg(feature = "rpc")]
self.register_zero_map_handler::<T, V, H, D>(&meta, map.clone());
Ok(map)
}
pub fn open_zero_map_fixed<T, const V: usize, H>(
&self,
config: FixedConfig,
hook: H,
migrations: &[TypedMigration<T::SelfId, T>],
) -> DbResult<Arc<ZeroMap<T::SelfId, V, T, H, crate::durability::Fixed>>>
where
T: CollectionMeta + Copy + Send + Sync + 'static,
T::SelfId: Key + Send + Sync + Hash + Eq,
H: TypedWriteHook<T::SelfId, T> + 'static,
{
self.open_zero_map_with(
|path, hook| ZeroMap::open_with_hook(path, config, hook),
hook,
migrations,
)
}
fn stored_info(&self, name: &str) -> CollectionInfo {
self.db_info
.cloned()
.collections
.get(name)
.copied()
.unwrap_or_default()
}
fn run_migration<K, T>(
&self,
meta: &TreeMeta,
stored: &CollectionInfo,
migrations: &[TypedMigration<K, T>],
migrate_fn: impl FnOnce(&super::migration::TypedMigrationFn<K, T>) -> DbResult<usize>,
) -> DbResult<bool> {
if stored.version != meta.version
&& let Some((_, mfn)) = migrations.iter().find(|(v, _)| *v == stored.version)
{
let mutated = migrate_fn(mfn)?;
tracing::info!(
mutated,
from = stored.version,
to = meta.version,
"migration"
);
return Ok(true);
}
Ok(false)
}
fn save_info(&self, meta: &TreeMeta, seq: u64) {
let typ_hash = meta.ty.h();
self.db_info.update(|info| {
info.collections.insert(
meta.name.to_owned(),
CollectionInfo {
version: meta.version,
typ_hash,
seq,
},
);
});
}
fn save_collection_len(&self, name: &str, len: u64) {
self.db_info.update(|info| {
if let Some(ci) = info.collections.get_mut(name) {
ci.seq = len;
}
});
}
}
#[cfg(feature = "rpc")]
impl Db {
fn register_handler(&self, name: &str, handler: Arc<dyn super::handler::RpcHandler>) {
let hashname = xxhash_rust::xxh3::xxh3_64(name.as_bytes());
self.handlers.lock().push((hashname, handler));
}
fn register_typed_tree_handler<T, C, H>(
&self,
meta: &TreeMeta,
tree: Arc<TypedTree<T::SelfId, T, C, H>>,
) where
T: CollectionMeta + Send + Sync + 'static,
T::SelfId: Key + Ord + Send + Sync,
C: Codec<T> + Default + 'static,
H: crate::hook::TypedWriteHook<T::SelfId, T> + 'static,
{
self.register_handler(
meta.name,
Arc::new(super::handler::TypedTreeHandler {
name: meta.name.to_owned(),
typ_hash: meta.ty.h(),
version: meta.version,
tree,
codec: Arc::new(C::default()),
seq: self.seq.clone(),
}),
);
}
fn register_typed_map_handler<T, C, H>(
&self,
meta: &TreeMeta,
map: Arc<TypedMap<T::SelfId, T, C, H>>,
) where
T: CollectionMeta + Send + Sync + 'static,
T::SelfId: Key + Send + Sync + Hash + Eq,
C: Codec<T> + Default + 'static,
H: crate::hook::TypedWriteHook<T::SelfId, T> + 'static,
{
self.register_handler(
meta.name,
Arc::new(super::handler::TypedMapHandler {
name: meta.name.to_owned(),
typ_hash: meta.ty.h(),
version: meta.version,
map,
codec: Arc::new(C::default()),
seq: self.seq.clone(),
}),
);
}
fn register_zero_tree_handler<T, const V: usize, H, D>(
&self,
meta: &TreeMeta,
tree: Arc<ZeroTree<T::SelfId, V, T, H, D>>,
) where
T: CollectionMeta + Copy + Send + Sync + 'static,
T::SelfId: Key + Ord + Send + Sync,
H: crate::hook::TypedWriteHook<T::SelfId, T> + 'static,
D: crate::durability::Durability + 'static,
{
self.register_handler(
meta.name,
Arc::new(super::handler::ZeroTreeHandler {
name: meta.name.to_owned(),
typ_hash: meta.ty.h(),
version: meta.version,
tree,
seq: self.seq.clone(),
}),
);
}
fn register_zero_map_handler<T, const V: usize, H, D>(
&self,
meta: &TreeMeta,
map: Arc<ZeroMap<T::SelfId, V, T, H, D>>,
) where
T: CollectionMeta + Copy + Send + Sync + 'static,
T::SelfId: Key + Send + Sync + Hash + Eq,
H: crate::hook::TypedWriteHook<T::SelfId, T> + 'static,
D: crate::durability::Durability + 'static,
{
self.register_handler(
meta.name,
Arc::new(super::handler::ZeroMapHandler {
name: meta.name.to_owned(),
typ_hash: meta.ty.h(),
version: meta.version,
map,
seq: self.seq.clone(),
}),
);
}
pub fn build_tree_map(&self) -> super::rpc::TreeMap {
Arc::new(
self.handlers
.lock()
.iter()
.map(|(h, handler)| (*h, handler.clone()))
.collect(),
)
}
}
impl Drop for Db {
fn drop(&mut self) {
let _ = self.shutdown();
}
}