use std::{fmt::Debug, sync::Arc};
use crossbeam_skiplist::SkipMap;
use fjall::{Keyspace, KeyspaceCreateOptions, Slice};
use super::{
db::Db,
raw::{InnerFields, RawTree},
tree::TypedTree,
};
use crate::{GetType, Record, logdb::ReadTree, migration, utils::CollectionInfo};
pub struct TreeBuilder<Item: Record> {
name: String,
db: Db,
tree: Keyspace,
inner: InnerFields,
phantom: std::marker::PhantomData<Item>,
}
impl<Item: Record> TreeBuilder<Item> {
#[instrument(name = "TreeBuilder::with_name", skip(db, options))]
pub(crate) fn with_name(db: &Db, name: &str, options: Option<KeyspaceCreateOptions>) -> Self {
let start = std::time::Instant::now();
let target_name = migration::collection_name(name, <Item as Record>::VERSION);
let tree = db.open_tree(&target_name, options);
debug!("tree opened");
let info = db
.db_info
.cloned()
.collections
.get(name)
.cloned()
.unwrap_or_default();
let hashpoints = SkipMap::new();
let inner = InnerFields { info, hashpoints };
histogram!("armour_logdb_builder_with_name_duration", "name" => name.to_owned())
.record(start.elapsed().as_secs_f64());
Self {
name: name.to_owned(),
db: db.clone(),
tree,
inner,
phantom: std::marker::PhantomData,
}
}
pub(crate) fn new(db: &Db, options: Option<KeyspaceCreateOptions>) -> Self {
Self::with_name(db, <Item as Record>::NAME, options)
}
pub fn approximate_len(&self) -> usize {
self.tree.approximate_len()
}
pub fn build(self) -> TypedTree<Item>
where
Item: Record<Value = Slice> + GetType + Debug,
{
self.inner_build(None::<fn(&Item::SelfId, &Item)>)
}
pub fn build_with_handler<F>(self, f: F) -> TypedTree<Item>
where
Item: Record<Value = Slice> + GetType + Debug,
F: for<'a> FnMut(&'a Item::SelfId, &'a Item),
{
self.inner_build(Some(f))
}
#[instrument(name = "TreeBuilder::build", skip(self, f), fields(name = self.name))]
fn inner_build<F>(self, f: Option<F>) -> TypedTree<Item>
where
Item: Record<Value = Slice> + GetType + Debug,
F: for<'a> FnMut(&'a Item::SelfId, &'a Item),
{
let start = std::time::Instant::now();
let TreeBuilder {
db,
tree,
inner,
name,
phantom: _,
} = self;
let is_empty = tree.is_empty().expect("cannot check if tree is empty");
let info = inner.info;
let target_name = migration::collection_name(&name, <Item as Record>::VERSION);
let removed = db
.db
.keyspace(&format!("__{target_name}_removed"), || {
KeyspaceCreateOptions::default()
})
.expect("Failed to open partition");
let raw = RawTree {
db: db.clone(),
name: name.clone(),
partition_name: target_name.clone(),
hashname: xxhash_rust::xxh3::xxh3_64(name.as_bytes()),
attributes: &Item::ATTRIBUTES,
tree,
removed,
inner: Arc::new(inner),
meta_saved: Default::default(),
seq_lock: Arc::default(),
};
let collection = TypedTree {
inner: ReadTree {
raw,
phantom: std::marker::PhantomData,
},
};
let current_type_hash = <Item as GetType>::TYPE.h();
if !is_empty {
let instant = std::time::Instant::now();
if info.typ_hash != current_type_hash {
warn!(
"collection type hash changed from {:#X} to {:#X}",
info.typ_hash, current_type_hash
);
}
debug!("starting tree compute_hashpoints");
let iter = collection
.tree()
.iter()
.filter_map(|item| match item.into_inner() {
Ok(kv) => Some(kv),
Err(e) => {
error!(%e);
None
}
});
let (hash, count) = migration::compute_hashpoints::<Item, F>(
iter,
&collection.raw().inner.hashpoints,
f,
|_key, _val| {},
)
.expect("cannot compute hash");
let time = instant.elapsed().as_millis();
debug!(?hash, ?count, ?time, "tree compute_hashpoints end");
} else {
let needs_migration = info.version != <Item as Record>::VERSION
&& Item::MIGRATIONS
.iter()
.any(|(version, _)| *version == info.version);
if needs_migration {
let migration_fn = Item::MIGRATIONS
.iter()
.copied()
.find_map(|(version, mf)| {
if version == info.version {
info!(?version, "found migration");
Some(mf)
} else {
None
}
})
.expect("migration fn must exist");
let source_name = migration::collection_name(&name, info.version);
let source = db.open_tree(&source_name, None);
if !source.is_empty().unwrap_or(true) {
let instant = std::time::Instant::now();
let migrated =
migration::run_migration::<Item>(&source, collection.tree(), migration_fn)
.expect("migration failed");
info!(migrated, "migration complete");
let source_removed_name = format!("__{source_name}_removed");
if let Some(source_removed) = db.get_keyspace(&source_removed_name) {
if !source_removed.is_empty().unwrap_or(true) {
let target_removed =
db.open_tree(&format!("__{target_name}_removed"), None);
let removed_migrated = migration::run_migration::<Item>(
&source_removed,
&target_removed,
migration_fn,
)
.expect("removed migration failed");
info!(removed_migrated, "removed entries migration complete");
}
if let Err(e) = db.db.delete_keyspace(source_removed) {
error!(%e, "failed to delete source removed keyspace");
}
}
if let Err(e) = db.db.delete_keyspace(source) {
error!(%e, "failed to delete source keyspace");
}
let iter =
collection
.tree()
.iter()
.filter_map(|item| match item.into_inner() {
Ok(kv) => Some(kv),
Err(e) => {
error!(%e);
None
}
});
let (hash, count) = migration::compute_hashpoints::<Item, F>(
iter,
&collection.raw().inner.hashpoints,
f,
|_key, _val| {},
)
.expect("cannot compute hash");
let time = instant.elapsed().as_millis();
debug!(
?hash,
?count,
?time,
"tree compute_hashpoints after migration"
);
}
}
}
let version = <Item as Record>::VERSION;
let collection_info = CollectionInfo {
typ_hash: current_type_hash,
version,
};
if version > 0 {
debug!(version);
}
db.db_info.update(|info| {
info.collections.insert(name.clone(), collection_info);
});
histogram!("armour_logdb_builder_build_duration", "name" => name.clone())
.record(start.elapsed().as_secs_f64());
collection
}
}