use std::{
fmt::Debug,
sync::{Arc, atomic::AtomicU64},
};
use crossbeam_skiplist::SkipMap;
use fjall::{KeyspaceCreateOptions, OptimisticTxKeyspace, Readable, Slice};
use super::{
db::TxDb,
raw::{InnerFields, TxRawTree},
read::TxReadTree,
tree::{IndexesMap, TxTree},
};
use crate::{
GetType, Record,
indexes::IndexUpdateCollection,
migration,
utils::{CollectionInfo, EventHandlers, IdChangeEventFn, RemoveEventFn, UpsertEventFn},
};
pub struct TxTreeBuilder<Item: Record> {
name: String,
db: TxDb,
tree: OptimisticTxKeyspace,
inner: InnerFields,
index_maps: IndexesMap<Item>,
handlers: EventHandlers<Item>,
}
impl<Item: Record> TxTreeBuilder<Item> {
#[instrument(name = "TxTreeBuilder::with_name", skip(db, options))]
pub(crate) fn with_name(db: &TxDb, name: &str, options: Option<KeyspaceCreateOptions>) -> Self {
let start = std::time::Instant::now();
let target_name = migration::collection_name(name, <Item as Record>::VERSION);
let tree = db.open_tree(&target_name, options);
debug!("tree opened");
let info = db
.db_info
.cloned()
.collections
.get(name)
.cloned()
.unwrap_or_default();
let hashpoints = SkipMap::new();
let seq = AtomicU64::new(tree.approximate_len() as u64);
let inner = InnerFields {
info,
hashpoints,
seq,
};
histogram!("armour_logdb_tx_builder_with_name_duration", "name" => name.to_owned())
.record(start.elapsed().as_secs_f64());
Self {
name: name.to_owned(),
db: db.clone(),
tree,
inner,
index_maps: Default::default(),
handlers: Default::default(),
}
}
pub(crate) fn new(db: &TxDb, options: Option<KeyspaceCreateOptions>) -> Self {
Self::with_name(db, <Item as Record>::NAME, options)
}
pub fn add_index<T>(mut self, index: T) -> Self
where
T: IndexUpdateCollection<Item> + 'static,
{
self.index_maps.push(Box::new(index));
self
}
pub fn add_id_change_fns(mut self, f: IdChangeEventFn<Item>) -> Self {
self.handlers.on_id_change = Some(f);
self
}
pub fn add_update_fn(mut self, f: UpsertEventFn<Item>) -> Self {
self.handlers.on_upsert = Some(f);
self
}
pub fn add_removed_fn(mut self, f: RemoveEventFn<Item>) -> Self {
self.handlers.on_remove = Some(f);
self
}
pub fn approximate_len(&self) -> usize {
self.tree.approximate_len()
}
pub fn build(self) -> TxTree<Item>
where
Item: Record<Value = Slice> + GetType + Debug,
{
self.inner_build(None::<fn(&Item::SelfId, &Item)>)
}
pub fn build_with_handler<F>(self, f: F) -> TxTree<Item>
where
Item: Record<Value = Slice> + GetType + Debug,
F: for<'a> FnMut(&'a Item::SelfId, &'a Item),
{
self.inner_build(Some(f))
}
#[instrument(name = "TxTreeBuilder::build", skip(self, f), fields(name = self.name))]
fn inner_build<F>(self, f: Option<F>) -> TxTree<Item>
where
Item: Record<Value = Slice> + GetType + Debug,
F: for<'a> FnMut(&'a Item::SelfId, &'a Item),
{
let start = std::time::Instant::now();
let TxTreeBuilder {
db,
tree,
inner,
name,
index_maps,
handlers,
} = self;
let is_empty = tree.first_key_value().is_none();
let info = inner.info;
let target_name = migration::collection_name(&name, <Item as Record>::VERSION);
let removed = db
.db
.keyspace(&format!("__{target_name}_removed"), || {
KeyspaceCreateOptions::default()
})
.expect("Failed to open partition");
let raw = TxRawTree {
db: db.clone(),
name: name.clone(),
partition_name: target_name.clone(),
hashname: xxhash_rust::xxh3::xxh3_64(name.as_bytes()),
attributes: &Item::ATTRIBUTES,
tree,
removed: removed.inner().clone(),
inner: Arc::new(inner),
meta_saved: Default::default(),
};
let index_maps = Arc::new(index_maps);
let handlers = Arc::new(handlers);
let collection = TxTree {
inner: TxReadTree {
raw,
phantom: std::marker::PhantomData,
},
index_maps,
handlers,
};
let current_type_hash = <Item as GetType>::TYPE.h();
if !is_empty {
let instant = std::time::Instant::now();
if info.typ_hash != current_type_hash {
warn!(
"collection type hash changed from {:#X} to {:#X}",
info.typ_hash, current_type_hash
);
}
debug!("starting tree compute_hashpoints");
let tx = collection.raw().db.db.read_tx();
let iter = tx
.iter(collection.tree())
.filter_map(|item| match item.into_inner() {
Ok(kv) => Some(kv),
Err(e) => {
error!(%e);
None
}
});
let (hash, count) = migration::compute_hashpoints::<Item, F>(
iter,
&collection.raw().inner.hashpoints,
f,
|key, val| {
collection.update_indexes(key, None, Some(val));
collection.upsert_event(key, None, val);
},
)
.expect("cannot compute hash");
let time = instant.elapsed().as_millis();
debug!(?hash, ?count, ?time, "tree compute_hashpoints end");
} else {
let needs_migration = info.version != <Item as Record>::VERSION
&& Item::MIGRATIONS
.iter()
.any(|(version, _)| *version == info.version);
if needs_migration {
let migration_fn = Item::MIGRATIONS
.iter()
.copied()
.find_map(|(version, mf)| {
if version == info.version {
info!(?version, "found migration");
Some(mf)
} else {
None
}
})
.expect("migration fn must exist");
let source_name = migration::collection_name(&name, info.version);
let source_ks = db
.db
.inner()
.keyspace(&source_name, KeyspaceCreateOptions::default)
.expect("Failed to open source keyspace for migration");
if !source_ks.is_empty().unwrap_or(true) {
let instant = std::time::Instant::now();
let target_ks = db
.db
.inner()
.keyspace(&target_name, KeyspaceCreateOptions::default)
.expect("Failed to open target keyspace for migration");
let migrated =
migration::run_migration::<Item>(&source_ks, &target_ks, migration_fn)
.expect("migration failed");
info!(migrated, "migration complete");
let source_removed_name = format!("__{source_name}_removed");
if let Ok(source_removed) = db
.db
.inner()
.keyspace(&source_removed_name, KeyspaceCreateOptions::default)
{
if !source_removed.is_empty().unwrap_or(true) {
let target_removed_name = format!("__{target_name}_removed");
let target_removed = db
.db
.inner()
.keyspace(&target_removed_name, KeyspaceCreateOptions::default)
.expect("Failed to open target removed keyspace");
let removed_migrated = migration::run_migration::<Item>(
&source_removed,
&target_removed,
migration_fn,
)
.expect("removed migration failed");
info!(removed_migrated, "removed entries migration complete");
}
info!(source_removed_name, "TODO: cleanup source keyspaces");
}
let tx = collection.raw().db.db.read_tx();
let iter =
tx.iter(collection.tree())
.filter_map(|item| match item.into_inner() {
Ok(kv) => Some(kv),
Err(e) => {
error!(%e);
None
}
});
let (hash, count) = migration::compute_hashpoints::<Item, F>(
iter,
&collection.raw().inner.hashpoints,
f,
|key, val| {
collection.update_indexes(key, None, Some(val));
collection.upsert_event(key, None, val);
},
)
.expect("cannot compute hash");
let time = instant.elapsed().as_millis();
debug!(
?hash,
?count,
?time,
"tree compute_hashpoints after migration"
);
}
}
}
let version = <Item as Record>::VERSION;
let collection_info = CollectionInfo {
typ_hash: current_type_hash,
version,
};
if version > 0 {
debug!(version);
}
db.db_info.update(|info| {
info.collections.insert(name.clone(), collection_info);
});
histogram!("armour_logdb_tx_builder_build_duration", "name" => name.clone())
.record(start.elapsed().as_secs_f64());
collection
}
}