armour 0.30.27

DDL and serialization for key-value storage
Documentation
use std::{
    fmt::Debug,
    sync::{Arc, atomic::AtomicU64},
};

use crossbeam_skiplist::SkipMap;
use fjall::{KeyspaceCreateOptions, OptimisticTxKeyspace, Readable, Slice};

use super::{
    db::TxDb,
    raw::{InnerFields, TxRawTree},
    read::TxReadTree,
    tree::{IndexesMap, TxTree},
};
use crate::{
    GetType, Record,
    indexes::IndexUpdateCollection,
    migration,
    utils::{CollectionInfo, EventHandlers, IdChangeEventFn, RemoveEventFn, UpsertEventFn},
};

pub struct TxTreeBuilder<Item: Record> {
    name: String,
    db: TxDb,
    tree: OptimisticTxKeyspace,
    inner: InnerFields,
    index_maps: IndexesMap<Item>,
    handlers: EventHandlers<Item>,
}

impl<Item: Record> TxTreeBuilder<Item> {
    #[instrument(name = "TxTreeBuilder::with_name", skip(db, options))]
    pub(crate) fn with_name(db: &TxDb, name: &str, options: Option<KeyspaceCreateOptions>) -> Self {
        let start = std::time::Instant::now();
        let target_name = migration::collection_name(name, <Item as Record>::VERSION);
        let tree = db.open_tree(&target_name, options);

        debug!("tree opened");

        let info = db
            .db_info
            .cloned()
            .collections
            .get(name)
            .cloned()
            .unwrap_or_default();

        let hashpoints = SkipMap::new();
        let seq = AtomicU64::new(tree.approximate_len() as u64);

        let inner = InnerFields {
            info,
            hashpoints,
            seq,
        };

        histogram!("armour_logdb_tx_builder_with_name_duration", "name" => name.to_owned())
            .record(start.elapsed().as_secs_f64());

        Self {
            name: name.to_owned(),
            db: db.clone(),
            tree,
            inner,
            index_maps: Default::default(),
            handlers: Default::default(),
        }
    }

    pub(crate) fn new(db: &TxDb, options: Option<KeyspaceCreateOptions>) -> Self {
        Self::with_name(db, <Item as Record>::NAME, options)
    }

    pub fn add_index<T>(mut self, index: T) -> Self
    where
        T: IndexUpdateCollection<Item> + 'static,
    {
        self.index_maps.push(Box::new(index));
        self
    }

    /// Fn(&Item, old_id: &Id, new_id: &Id)
    pub fn add_id_change_fns(mut self, f: IdChangeEventFn<Item>) -> Self {
        self.handlers.on_id_change = Some(f);
        self
    }

    /// Fn(&Id, Change {old: &Item, new: &Item})
    pub fn add_update_fn(mut self, f: UpsertEventFn<Item>) -> Self {
        self.handlers.on_upsert = Some(f);
        self
    }

    /// Fn(&Id, &Item)
    pub fn add_removed_fn(mut self, f: RemoveEventFn<Item>) -> Self {
        self.handlers.on_remove = Some(f);
        self
    }

    pub fn approximate_len(&self) -> usize {
        self.tree.approximate_len()
    }

    pub fn build(self) -> TxTree<Item>
    where
        Item: Record<Value = Slice> + GetType + Debug,
    {
        self.inner_build(None::<fn(&Item::SelfId, &Item)>)
    }

    pub fn build_with_handler<F>(self, f: F) -> TxTree<Item>
    where
        Item: Record<Value = Slice> + GetType + Debug,
        F: for<'a> FnMut(&'a Item::SelfId, &'a Item),
    {
        self.inner_build(Some(f))
    }

    /// Fn(&Id, &Item)
    // iterate tree and compute hash for each N items with bromberg_sl2 crate
    #[instrument(name = "TxTreeBuilder::build", skip(self, f), fields(name = self.name))]
    fn inner_build<F>(self, f: Option<F>) -> TxTree<Item>
    where
        Item: Record<Value = Slice> + GetType + Debug,
        F: for<'a> FnMut(&'a Item::SelfId, &'a Item),
    {
        let start = std::time::Instant::now();
        let TxTreeBuilder {
            db,
            tree,
            inner,
            name,
            index_maps,
            handlers,
        } = self;

        let is_empty = tree.first_key_value().is_none();

        let info = inner.info;
        let target_name = migration::collection_name(&name, <Item as Record>::VERSION);

        let removed = db
            .db
            .keyspace(&format!("__{target_name}_removed"), || {
                KeyspaceCreateOptions::default()
            })
            .expect("Failed to open partition");

        let raw = TxRawTree {
            db: db.clone(),
            name: name.clone(),
            partition_name: target_name.clone(),
            hashname: xxhash_rust::xxh3::xxh3_64(name.as_bytes()),
            attributes: &Item::ATTRIBUTES,
            tree,
            removed: removed.inner().clone(),
            inner: Arc::new(inner),
            meta_saved: Default::default(),
        };

        let index_maps = Arc::new(index_maps);
        let handlers = Arc::new(handlers);

        let collection = TxTree {
            inner: TxReadTree {
                raw,
                phantom: std::marker::PhantomData,
            },
            index_maps,
            handlers,
        };

        let current_type_hash = <Item as GetType>::TYPE.h();

        if !is_empty {
            let instant = std::time::Instant::now();

            if info.typ_hash != current_type_hash {
                warn!(
                    "collection type hash changed from {:#X} to {:#X}",
                    info.typ_hash, current_type_hash
                );
            }

            debug!("starting tree compute_hashpoints");
            let tx = collection.raw().db.db.read_tx();
            let iter = tx
                .iter(collection.tree())
                .filter_map(|item| match item.into_inner() {
                    Ok(kv) => Some(kv),
                    Err(e) => {
                        error!(%e);
                        None
                    }
                });

            let (hash, count) = migration::compute_hashpoints::<Item, F>(
                iter,
                &collection.raw().inner.hashpoints,
                f,
                |key, val| {
                    collection.update_indexes(key, None, Some(val));
                    collection.upsert_event(key, None, val);
                },
            )
            .expect("cannot compute hash");
            let time = instant.elapsed().as_millis();

            debug!(?hash, ?count, ?time, "tree compute_hashpoints end");
        } else {
            // Check if migration is needed
            let needs_migration = info.version != <Item as Record>::VERSION
                && Item::MIGRATIONS
                    .iter()
                    .any(|(version, _)| *version == info.version);

            if needs_migration {
                let migration_fn = Item::MIGRATIONS
                    .iter()
                    .copied()
                    .find_map(|(version, mf)| {
                        if version == info.version {
                            info!(?version, "found migration");
                            Some(mf)
                        } else {
                            None
                        }
                    })
                    .expect("migration fn must exist");

                let source_name = migration::collection_name(&name, info.version);

                // Open source as non-transactional Keyspace for migration read
                let source_ks = db
                    .db
                    .inner()
                    .keyspace(&source_name, KeyspaceCreateOptions::default)
                    .expect("Failed to open source keyspace for migration");

                if !source_ks.is_empty().unwrap_or(true) {
                    let instant = std::time::Instant::now();

                    // Target: open as non-transactional for migration write
                    let target_ks = db
                        .db
                        .inner()
                        .keyspace(&target_name, KeyspaceCreateOptions::default)
                        .expect("Failed to open target keyspace for migration");

                    let migrated =
                        migration::run_migration::<Item>(&source_ks, &target_ks, migration_fn)
                            .expect("migration failed");

                    info!(migrated, "migration complete");

                    // Migrate removed entries
                    let source_removed_name = format!("__{source_name}_removed");
                    if let Ok(source_removed) = db
                        .db
                        .inner()
                        .keyspace(&source_removed_name, KeyspaceCreateOptions::default)
                    {
                        if !source_removed.is_empty().unwrap_or(true) {
                            let target_removed_name = format!("__{target_name}_removed");
                            let target_removed = db
                                .db
                                .inner()
                                .keyspace(&target_removed_name, KeyspaceCreateOptions::default)
                                .expect("Failed to open target removed keyspace");
                            let removed_migrated = migration::run_migration::<Item>(
                                &source_removed,
                                &target_removed,
                                migration_fn,
                            )
                            .expect("removed migration failed");
                            info!(removed_migrated, "removed entries migration complete");
                        }
                        // TODO: delete_keyspace for OptimisticTxDatabase
                        info!(source_removed_name, "TODO: cleanup source keyspaces");
                    }

                    // Now compute hashpoints on migrated data
                    let tx = collection.raw().db.db.read_tx();
                    let iter =
                        tx.iter(collection.tree())
                            .filter_map(|item| match item.into_inner() {
                                Ok(kv) => Some(kv),
                                Err(e) => {
                                    error!(%e);
                                    None
                                }
                            });

                    let (hash, count) = migration::compute_hashpoints::<Item, F>(
                        iter,
                        &collection.raw().inner.hashpoints,
                        f,
                        |key, val| {
                            collection.update_indexes(key, None, Some(val));
                            collection.upsert_event(key, None, val);
                        },
                    )
                    .expect("cannot compute hash");
                    let time = instant.elapsed().as_millis();

                    debug!(
                        ?hash,
                        ?count,
                        ?time,
                        "tree compute_hashpoints after migration"
                    );
                }
            }
        }

        let version = <Item as Record>::VERSION;

        let collection_info = CollectionInfo {
            typ_hash: current_type_hash,
            version,
        };
        if version > 0 {
            debug!(version);
        }

        db.db_info.update(|info| {
            info.collections.insert(name.clone(), collection_info);
        });

        histogram!("armour_logdb_tx_builder_build_duration", "name" => name.clone())
            .record(start.elapsed().as_secs_f64());

        collection
    }
}