armour 0.30.27

DDL and serialization for key-value storage
Documentation
use std::{fmt::Debug, sync::Arc};

use crossbeam_skiplist::SkipMap;
use fjall::{Keyspace, KeyspaceCreateOptions, Slice};

use super::{
    db::Db,
    raw::{InnerFields, RawTree},
    tree::TypedTree,
};
use crate::{GetType, Record, logdb::ReadTree, migration, utils::CollectionInfo};

pub struct TreeBuilder<Item: Record> {
    name: String,
    db: Db,
    tree: Keyspace,
    inner: InnerFields,
    phantom: std::marker::PhantomData<Item>,
}

impl<Item: Record> TreeBuilder<Item> {
    #[instrument(name = "TreeBuilder::with_name", skip(db, options))]
    pub(crate) fn with_name(db: &Db, name: &str, options: Option<KeyspaceCreateOptions>) -> Self {
        let start = std::time::Instant::now();
        let target_name = migration::collection_name(name, <Item as Record>::VERSION);
        let tree = db.open_tree(&target_name, options);

        debug!("tree opened");

        let info = db
            .db_info
            .cloned()
            .collections
            .get(name)
            .cloned()
            .unwrap_or_default();

        let hashpoints = SkipMap::new();

        let inner = InnerFields { info, hashpoints };

        histogram!("armour_logdb_builder_with_name_duration", "name" => name.to_owned())
            .record(start.elapsed().as_secs_f64());

        Self {
            name: name.to_owned(),
            db: db.clone(),
            tree,
            inner,
            phantom: std::marker::PhantomData,
        }
    }

    pub(crate) fn new(db: &Db, options: Option<KeyspaceCreateOptions>) -> Self {
        Self::with_name(db, <Item as Record>::NAME, options)
    }

    pub fn approximate_len(&self) -> usize {
        self.tree.approximate_len()
    }

    pub fn build(self) -> TypedTree<Item>
    where
        Item: Record<Value = Slice> + GetType + Debug,
    {
        self.inner_build(None::<fn(&Item::SelfId, &Item)>)
    }

    pub fn build_with_handler<F>(self, f: F) -> TypedTree<Item>
    where
        Item: Record<Value = Slice> + GetType + Debug,
        F: for<'a> FnMut(&'a Item::SelfId, &'a Item),
    {
        self.inner_build(Some(f))
    }

    /// Fn(&Id, &Item)
    // iterate tree and compute hash for each N items with bromberg_sl2 crate
    #[instrument(name = "TreeBuilder::build", skip(self, f), fields(name = self.name))]
    fn inner_build<F>(self, f: Option<F>) -> TypedTree<Item>
    where
        Item: Record<Value = Slice> + GetType + Debug,
        F: for<'a> FnMut(&'a Item::SelfId, &'a Item),
    {
        let start = std::time::Instant::now();
        let TreeBuilder {
            db,
            tree,
            inner,
            name,
            phantom: _,
        } = self;

        let is_empty = tree.is_empty().expect("cannot check if tree is empty");

        let info = inner.info;
        let target_name = migration::collection_name(&name, <Item as Record>::VERSION);

        let removed = db
            .db
            .keyspace(&format!("__{target_name}_removed"), || {
                KeyspaceCreateOptions::default()
            })
            .expect("Failed to open partition");

        let raw = RawTree {
            db: db.clone(),
            name: name.clone(),
            partition_name: target_name.clone(),
            hashname: xxhash_rust::xxh3::xxh3_64(name.as_bytes()),
            attributes: &Item::ATTRIBUTES,
            tree,
            removed,
            inner: Arc::new(inner),
            meta_saved: Default::default(),
            seq_lock: Arc::default(),
        };

        let collection = TypedTree {
            inner: ReadTree {
                raw,
                phantom: std::marker::PhantomData,
            },
        };

        let current_type_hash = <Item as GetType>::TYPE.h();

        if !is_empty {
            let instant = std::time::Instant::now();

            if info.typ_hash != current_type_hash {
                warn!(
                    "collection type hash changed from {:#X} to {:#X}",
                    info.typ_hash, current_type_hash
                );
            }

            debug!("starting tree compute_hashpoints");
            let iter = collection
                .tree()
                .iter()
                .filter_map(|item| match item.into_inner() {
                    Ok(kv) => Some(kv),
                    Err(e) => {
                        error!(%e);
                        None
                    }
                });

            let (hash, count) = migration::compute_hashpoints::<Item, F>(
                iter,
                &collection.raw().inner.hashpoints,
                f,
                |_key, _val| {},
            )
            .expect("cannot compute hash");
            let time = instant.elapsed().as_millis();

            debug!(?hash, ?count, ?time, "tree compute_hashpoints end");
        } else {
            // Check if migration is needed
            let needs_migration = info.version != <Item as Record>::VERSION
                && Item::MIGRATIONS
                    .iter()
                    .any(|(version, _)| *version == info.version);

            if needs_migration {
                let migration_fn = Item::MIGRATIONS
                    .iter()
                    .copied()
                    .find_map(|(version, mf)| {
                        if version == info.version {
                            info!(?version, "found migration");
                            Some(mf)
                        } else {
                            None
                        }
                    })
                    .expect("migration fn must exist");

                let source_name = migration::collection_name(&name, info.version);
                let source = db.open_tree(&source_name, None);

                if !source.is_empty().unwrap_or(true) {
                    let instant = std::time::Instant::now();

                    let migrated =
                        migration::run_migration::<Item>(&source, collection.tree(), migration_fn)
                            .expect("migration failed");

                    info!(migrated, "migration complete");

                    // Migrate removed entries
                    let source_removed_name = format!("__{source_name}_removed");
                    if let Some(source_removed) = db.get_keyspace(&source_removed_name) {
                        if !source_removed.is_empty().unwrap_or(true) {
                            let target_removed =
                                db.open_tree(&format!("__{target_name}_removed"), None);
                            let removed_migrated = migration::run_migration::<Item>(
                                &source_removed,
                                &target_removed,
                                migration_fn,
                            )
                            .expect("removed migration failed");
                            info!(removed_migrated, "removed entries migration complete");
                        }
                        if let Err(e) = db.db.delete_keyspace(source_removed) {
                            error!(%e, "failed to delete source removed keyspace");
                        }
                    }
                    if let Err(e) = db.db.delete_keyspace(source) {
                        error!(%e, "failed to delete source keyspace");
                    }

                    // Now compute hashpoints on migrated data
                    let iter =
                        collection
                            .tree()
                            .iter()
                            .filter_map(|item| match item.into_inner() {
                                Ok(kv) => Some(kv),
                                Err(e) => {
                                    error!(%e);
                                    None
                                }
                            });

                    let (hash, count) = migration::compute_hashpoints::<Item, F>(
                        iter,
                        &collection.raw().inner.hashpoints,
                        f,
                        |_key, _val| {},
                    )
                    .expect("cannot compute hash");
                    let time = instant.elapsed().as_millis();

                    debug!(
                        ?hash,
                        ?count,
                        ?time,
                        "tree compute_hashpoints after migration"
                    );
                }
            }
        }

        let version = <Item as Record>::VERSION;

        let collection_info = CollectionInfo {
            typ_hash: current_type_hash,
            version,
        };
        if version > 0 {
            debug!(version);
        }

        db.db_info.update(|info| {
            info.collections.insert(name.clone(), collection_info);
        });

        histogram!("armour_logdb_builder_build_duration", "name" => name.clone())
            .record(start.elapsed().as_secs_f64());

        collection
    }
}