#![allow(clippy::type_complexity)]
use std::hash::Hash;
use std::sync::Arc;
use std::time::Duration;
use std::{collections::BTreeMap, path::PathBuf};
use armour_core::persist::Persist;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use crate::compaction::Compactor;
use crate::hook::TypedWriteHook;
use crate::shutdown::ShutdownSignal;
use crate::{Codec, CollectionMeta, Config, DbResult, Key, TreeMeta};
use crate::{TypedMap, TypedTree, ZeroMap, ZeroTree};
use super::collection::Collection;
use super::migration::{TypedMigration, ZeroMigration};
use super::seq::SeqGen;
#[derive(Serialize, Deserialize, Default, Debug, Clone)]
pub struct DbInfo {
pub collections: BTreeMap<String, CollectionInfo>,
}
#[derive(Serialize, Deserialize, Default, Debug, Clone, Copy)]
pub struct CollectionInfo {
pub version: u16,
pub typ_hash: u64,
#[serde(default)]
pub seq: u64,
}
const DEFAULT_COMPACTION_INTERVAL: Duration = Duration::from_secs(60);
pub struct Db {
pub path: PathBuf,
pub db_info: Persist<DbInfo>,
pub seq: Arc<SeqGen>,
pub(crate) shutdown: ShutdownSignal,
collections: Arc<Mutex<Vec<Arc<dyn Collection>>>>,
#[cfg(feature = "rpc")]
handlers: Arc<Mutex<Vec<(u64, Arc<dyn super::handler::RpcHandler>)>>>,
#[cfg(feature = "rpc")]
pub(crate) rpc_handles: Mutex<Vec<super::rpc::RpcHandle>>,
compactor: Option<Compactor>,
}
impl Db {
pub fn open(path: impl AsRef<std::path::Path>) -> DbResult<Self> {
Self::open_with_compaction(path, Some(DEFAULT_COMPACTION_INTERVAL))
}
pub fn open_with_compaction(
path: impl AsRef<std::path::Path>,
compaction_interval: Option<Duration>,
) -> DbResult<Self> {
let path = path.as_ref().to_path_buf();
std::fs::create_dir_all(&path).map_err(crate::DbError::Io)?;
let db_info = Persist::open(path.join("db.info"));
let seq = SeqGen::open(path.join("__seq"))?;
let collections: Arc<Mutex<Vec<Arc<dyn Collection>>>> = Arc::new(Mutex::new(Vec::new()));
let shutdown = ShutdownSignal::new();
let compactor = compaction_interval.map(|interval| {
let cols = collections.clone();
Compactor::start_with_signal(
move || {
let snapshot = cols.lock().clone();
let mut total = 0;
for c in &snapshot {
total += c.compact()?;
}
Ok(total)
},
interval,
shutdown.clone(),
)
});
Ok(Self {
path,
db_info,
seq,
shutdown,
collections,
#[cfg(feature = "rpc")]
handlers: Arc::new(Mutex::new(Vec::new())),
#[cfg(feature = "rpc")]
rpc_handles: Mutex::new(Vec::new()),
compactor,
})
}
pub fn tree_path(&self, name: &str, version: u16) -> PathBuf {
self.path.join(format!("{name}:v{version}"))
}
pub fn next_id(&self, name: &str) -> DbResult<u64> {
self.seq.next_id(name)
}
pub fn collection_len<T: CollectionMeta>(&self) -> u64 {
self.stored_info(T::NAME).seq
}
pub fn close(&self) -> DbResult<()> {
for c in self.collections.lock().iter() {
self.save_collection_len(c.name(), c.len() as u64);
}
self.seq.flush()
}
pub fn shutdown(&mut self) -> DbResult<()> {
self.shutdown.shutdown();
#[cfg(feature = "rpc")]
for h in self.rpc_handles.get_mut().drain(..) {
drop(h); }
if let Some(ref mut c) = self.compactor {
c.stop();
}
self.close()
}
pub fn shutdown_signal(&self) -> ShutdownSignal {
self.shutdown.clone()
}
pub fn compact(&self) -> DbResult<usize> {
let snapshot = self.collections.lock().clone();
let mut total = 0;
for c in &snapshot {
total += c.compact()?;
}
Ok(total)
}
pub fn open_typed_tree<T, C>(
&self,
config: Config,
migrations: &[TypedMigration<T::SelfId, T>],
) -> DbResult<Arc<TypedTree<T::SelfId, T, C>>>
where
T: CollectionMeta + Clone + Send + Sync + 'static,
T::SelfId: Key + Ord + Send + Sync,
C: Codec<T> + Default + 'static,
{
self.open_typed_tree_inner(config, C::default(), crate::NoHook, migrations)
}
pub fn open_typed_tree_hooked<T, C, H>(
&self,
config: Config,
hook: H,
migrations: &[TypedMigration<T::SelfId, T>],
) -> DbResult<Arc<TypedTree<T::SelfId, T, C, H>>>
where
T: CollectionMeta + Clone + Send + Sync + 'static,
T::SelfId: Key + Ord + Send + Sync,
C: Codec<T> + Default + 'static,
H: TypedWriteHook<T::SelfId, T> + 'static,
{
self.open_typed_tree_inner(config, C::default(), hook, migrations)
}
fn open_typed_tree_inner<T, C, H>(
&self,
config: Config,
codec: C,
hook: H,
migrations: &[TypedMigration<T::SelfId, T>],
) -> DbResult<Arc<TypedTree<T::SelfId, T, C, H>>>
where
T: CollectionMeta + Clone + Send + Sync + 'static,
T::SelfId: Key + Ord + Send + Sync,
C: Codec<T> + Default + 'static,
H: TypedWriteHook<T::SelfId, T> + 'static,
{
let meta = TreeMeta::of::<T>();
let stored = self.stored_info(meta.name);
let tree =
TypedTree::open_hooked(self.tree_path(meta.name, meta.version), config, codec, hook)?;
let migrated = self.run_migration(&meta, &stored, migrations, |mfn| tree.migrate(mfn))?;
if !migrated {
tree.replay_init();
}
self.save_info(&meta, tree.len() as u64);
let tree = Arc::new(tree);
self.collections.lock().push(tree.clone());
#[cfg(feature = "rpc")]
self.register_typed_tree_handler::<T, C, H>(&meta, tree.clone());
Ok(tree)
}
pub fn open_typed_map<T, C>(
&self,
config: Config,
migrations: &[TypedMigration<T::SelfId, T>],
) -> DbResult<Arc<TypedMap<T::SelfId, T, C>>>
where
T: CollectionMeta + Clone + Send + Sync + 'static,
T::SelfId: Key + Send + Sync + Hash + Eq,
C: Codec<T> + Default + 'static,
{
self.open_typed_map_inner(config, C::default(), crate::NoHook, migrations)
}
pub fn open_typed_map_hooked<T, C, H>(
&self,
config: Config,
hook: H,
migrations: &[TypedMigration<T::SelfId, T>],
) -> DbResult<Arc<TypedMap<T::SelfId, T, C, H>>>
where
T: CollectionMeta + Clone + Send + Sync + 'static,
T::SelfId: Key + Send + Sync + Hash + Eq,
C: Codec<T> + Default + 'static,
H: TypedWriteHook<T::SelfId, T> + 'static,
{
self.open_typed_map_inner(config, C::default(), hook, migrations)
}
fn open_typed_map_inner<T, C, H>(
&self,
config: Config,
codec: C,
hook: H,
migrations: &[TypedMigration<T::SelfId, T>],
) -> DbResult<Arc<TypedMap<T::SelfId, T, C, H>>>
where
T: CollectionMeta + Clone + Send + Sync + 'static,
T::SelfId: Key + Send + Sync + Hash + Eq,
C: Codec<T> + Default + 'static,
H: TypedWriteHook<T::SelfId, T> + 'static,
{
let meta = TreeMeta::of::<T>();
let stored = self.stored_info(meta.name);
let map =
TypedMap::open_hooked(self.tree_path(meta.name, meta.version), config, codec, hook)?;
let migrated = self.run_migration(&meta, &stored, migrations, |mfn| map.migrate(mfn))?;
if !migrated {
map.replay_init();
}
self.save_info(&meta, map.len() as u64);
let map = Arc::new(map);
self.collections.lock().push(map.clone());
#[cfg(feature = "rpc")]
self.register_typed_map_handler::<T, C, H>(&meta, map.clone());
Ok(map)
}
pub fn open_zero_tree<T, const V: usize>(
&self,
config: Config,
migrations: &[ZeroMigration<T::SelfId, T>],
) -> DbResult<Arc<ZeroTree<T::SelfId, V, T>>>
where
T: CollectionMeta + Copy + Send + Sync + 'static,
T::SelfId: Key + Ord + Send + Sync,
{
self.open_zero_tree_inner(config, crate::NoHook, migrations)
}
pub fn open_zero_tree_hooked<T, const V: usize, H>(
&self,
config: Config,
hook: H,
migrations: &[ZeroMigration<T::SelfId, T>],
) -> DbResult<Arc<ZeroTree<T::SelfId, V, T, H, crate::durability::Bitcask>>>
where
T: CollectionMeta + Copy + Send + Sync + 'static,
T::SelfId: Key + Ord + Send + Sync,
H: TypedWriteHook<T::SelfId, T> + 'static,
{
self.open_zero_tree_inner(config, hook, migrations)
}
fn open_zero_tree_inner<T, const V: usize, H>(
&self,
config: Config,
hook: H,
migrations: &[ZeroMigration<T::SelfId, T>],
) -> DbResult<Arc<ZeroTree<T::SelfId, V, T, H, crate::durability::Bitcask>>>
where
T: CollectionMeta + Copy + Send + Sync + 'static,
T::SelfId: Key + Ord + Send + Sync,
H: TypedWriteHook<T::SelfId, T> + 'static,
{
let meta = TreeMeta::of::<T>();
let stored = self.stored_info(meta.name);
let tree = ZeroTree::open_hooked(self.tree_path(meta.name, meta.version), config, hook)?;
let migrated = self.run_migration(&meta, &stored, migrations, |mfn| tree.migrate(mfn))?;
if !migrated {
tree.replay_init();
}
self.save_info(&meta, tree.len() as u64);
let tree = Arc::new(tree);
self.collections.lock().push(tree.clone());
#[cfg(feature = "rpc")]
self.register_zero_tree_handler::<T, V, H>(&meta, tree.clone());
Ok(tree)
}
pub fn open_zero_map<T, const V: usize>(
&self,
config: Config,
migrations: &[ZeroMigration<T::SelfId, T>],
) -> DbResult<Arc<ZeroMap<T::SelfId, V, T>>>
where
T: CollectionMeta + Copy + Send + Sync + 'static,
T::SelfId: Key + Send + Sync + Hash + Eq,
{
self.open_zero_map_inner(config, crate::NoHook, migrations)
}
pub fn open_zero_map_hooked<T, const V: usize, H>(
&self,
config: Config,
hook: H,
migrations: &[ZeroMigration<T::SelfId, T>],
) -> DbResult<Arc<ZeroMap<T::SelfId, V, T, H, crate::durability::Bitcask>>>
where
T: CollectionMeta + Copy + Send + Sync + 'static,
T::SelfId: Key + Send + Sync + Hash + Eq,
H: TypedWriteHook<T::SelfId, T> + 'static,
{
self.open_zero_map_inner(config, hook, migrations)
}
fn open_zero_map_inner<T, const V: usize, H>(
&self,
config: Config,
hook: H,
migrations: &[ZeroMigration<T::SelfId, T>],
) -> DbResult<Arc<ZeroMap<T::SelfId, V, T, H, crate::durability::Bitcask>>>
where
T: CollectionMeta + Copy + Send + Sync + 'static,
T::SelfId: Key + Send + Sync + Hash + Eq,
H: TypedWriteHook<T::SelfId, T> + 'static,
{
let meta = TreeMeta::of::<T>();
let stored = self.stored_info(meta.name);
let map = ZeroMap::open_hooked(self.tree_path(meta.name, meta.version), config, hook)?;
let migrated = self.run_migration(&meta, &stored, migrations, |mfn| map.migrate(mfn))?;
if !migrated {
map.replay_init();
}
self.save_info(&meta, map.len() as u64);
let map = Arc::new(map);
self.collections.lock().push(map.clone());
#[cfg(feature = "rpc")]
self.register_zero_map_handler::<T, V, H>(&meta, map.clone());
Ok(map)
}
fn stored_info(&self, name: &str) -> CollectionInfo {
self.db_info
.cloned()
.collections
.get(name)
.copied()
.unwrap_or_default()
}
fn run_migration<K, T>(
&self,
meta: &TreeMeta,
stored: &CollectionInfo,
migrations: &[TypedMigration<K, T>],
migrate_fn: impl FnOnce(&super::migration::TypedMigrationFn<K, T>) -> DbResult<usize>,
) -> DbResult<bool> {
if stored.version != meta.version
&& let Some((_, mfn)) = migrations.iter().find(|(v, _)| *v == stored.version)
{
let mutated = migrate_fn(mfn)?;
tracing::info!(
mutated,
from = stored.version,
to = meta.version,
"migration"
);
return Ok(true);
}
Ok(false)
}
fn save_info(&self, meta: &TreeMeta, seq: u64) {
let typ_hash = meta.ty.h();
self.db_info.update(|info| {
info.collections.insert(
meta.name.to_owned(),
CollectionInfo {
version: meta.version,
typ_hash,
seq,
},
);
});
}
fn save_collection_len(&self, name: &str, len: u64) {
self.db_info.update(|info| {
if let Some(ci) = info.collections.get_mut(name) {
ci.seq = len;
}
});
}
}
#[cfg(feature = "rpc")]
impl Db {
fn register_handler(&self, name: &str, handler: Arc<dyn super::handler::RpcHandler>) {
let hashname = xxhash_rust::xxh3::xxh3_64(name.as_bytes());
self.handlers.lock().push((hashname, handler));
}
fn register_typed_tree_handler<T, C, H>(
&self,
meta: &TreeMeta,
tree: Arc<TypedTree<T::SelfId, T, C, H>>,
) where
T: CollectionMeta + Send + Sync + 'static,
T::SelfId: Key + Ord + Send + Sync,
C: Codec<T> + Default + 'static,
H: crate::hook::TypedWriteHook<T::SelfId, T> + 'static,
{
self.register_handler(
meta.name,
Arc::new(super::handler::TypedTreeHandler {
name: meta.name.to_owned(),
typ_hash: meta.ty.h(),
version: meta.version,
tree,
codec: Arc::new(C::default()),
seq: self.seq.clone(),
}),
);
}
fn register_typed_map_handler<T, C, H>(
&self,
meta: &TreeMeta,
map: Arc<TypedMap<T::SelfId, T, C, H>>,
) where
T: CollectionMeta + Send + Sync + 'static,
T::SelfId: Key + Send + Sync + Hash + Eq,
C: Codec<T> + Default + 'static,
H: crate::hook::TypedWriteHook<T::SelfId, T> + 'static,
{
self.register_handler(
meta.name,
Arc::new(super::handler::TypedMapHandler {
name: meta.name.to_owned(),
typ_hash: meta.ty.h(),
version: meta.version,
map,
codec: Arc::new(C::default()),
seq: self.seq.clone(),
}),
);
}
fn register_zero_tree_handler<T, const V: usize, H>(
&self,
meta: &TreeMeta,
tree: Arc<ZeroTree<T::SelfId, V, T, H, crate::durability::Bitcask>>,
) where
T: CollectionMeta + Copy + Send + Sync + 'static,
T::SelfId: Key + Ord + Send + Sync,
H: crate::hook::TypedWriteHook<T::SelfId, T> + 'static,
{
self.register_handler(
meta.name,
Arc::new(super::handler::ZeroTreeHandler {
name: meta.name.to_owned(),
typ_hash: meta.ty.h(),
version: meta.version,
tree,
seq: self.seq.clone(),
}),
);
}
fn register_zero_map_handler<T, const V: usize, H>(
&self,
meta: &TreeMeta,
map: Arc<ZeroMap<T::SelfId, V, T, H, crate::durability::Bitcask>>,
) where
T: CollectionMeta + Copy + Send + Sync + 'static,
T::SelfId: Key + Send + Sync + Hash + Eq,
H: crate::hook::TypedWriteHook<T::SelfId, T> + 'static,
{
self.register_handler(
meta.name,
Arc::new(super::handler::ZeroMapHandler {
name: meta.name.to_owned(),
typ_hash: meta.ty.h(),
version: meta.version,
map,
seq: self.seq.clone(),
}),
);
}
pub fn build_tree_map(&self) -> super::rpc::TreeMap {
Arc::new(
self.handlers
.lock()
.iter()
.map(|(h, handler)| (*h, handler.clone()))
.collect(),
)
}
}
impl Drop for Db {
fn drop(&mut self) {
let _ = self.shutdown();
}
}