armour 0.30.27

DDL and serialization for key-value storage
Documentation
use std::{collections::BTreeMap, fmt::Debug};

use fjall::{Keyspace, Slice};
use xxhash_rust::xxh3::Xxh3Default;

use crate::{
    Cid, Record,
    error::{DbError, DbResult},
    record::{MigrationFn, MigrationRes, MigrationType},
    utils::{GroupVal, HashPoints},
};

pub(crate) fn collection_name(base_name: &str, version: u16) -> String {
    format!("{base_name}:v{version}")
}

/// Migrate data from source keyspace to target keyspace.
/// Returns number of migrated entries.
pub(crate) fn run_migration<Item>(
    source: &Keyspace,
    target: &Keyspace,
    migration_fn: MigrationFn<Item, Slice>,
) -> DbResult<u64>
where
    Item: Record<Value = Slice>,
{
    let mut count = 0u64;

    for item in source.iter() {
        let (k, bytes) = item.into_inner()?;
        let key = k.as_ref();

        match migration_fn(key, &bytes) {
            MigrationRes::Unchanged(..) => {
                target.insert(key, bytes)?;
                count += 1;
            }
            MigrationRes::Deleted => {}
            MigrationRes::Changed {
                migration_type,
                new_key,
                new_value,
            } => {
                match migration_type {
                    MigrationType::Key => {
                        let encoded = Cid::encode(&new_key);
                        target.insert(encoded.as_ref(), bytes)?;
                    }
                    MigrationType::Value => {
                        let val = Record::ser(&new_value);
                        target.insert(key, val)?;
                    }
                    MigrationType::Entry => {
                        let encoded = Cid::encode(&new_key);
                        let val = Record::ser(&new_value);
                        target.insert(encoded.as_ref(), val)?;
                    }
                }
                count += 1;
            }
        }
    }

    Ok(count)
}

/// Iterate entries, compute group-based xxh3 hashes and call handlers.
/// Returns (total_hash, count).
pub(crate) fn compute_hashpoints<Item, F>(
    iter: impl Iterator<Item = (Slice, Slice)>,
    hashpoints: &HashPoints,
    mut handle_fn: Option<F>,
    mut on_entry: impl FnMut(&Item::SelfId, &Item),
) -> DbResult<(u64, u64)>
where
    Item: Record<Value = Slice> + Debug,
    F: for<'a> FnMut(&'a Item::SelfId, &'a Item),
{
    let mut map = BTreeMap::<u32, Xxh3Default>::new();
    let mut count = 0u64;

    for (k, bytes) in iter {
        let key = <Item as Record>::SelfId::from_bytes(&k).map_err(DbError::from)?;
        let val = <Item as Record>::deser(&bytes);

        if let Some(f) = handle_fn.as_mut() {
            f(&key, &val);
        }

        on_entry(&key, &val);

        count += 1;

        let group_id = key.group_id();
        let hasher = map.entry(group_id).or_default();
        hasher.update(&k);
        hasher.update(&bytes);
    }

    let mut hasher = Xxh3Default::new();

    for (k, v) in map {
        hasher.update(&k.to_le_bytes());
        hashpoints.insert(
            k,
            GroupVal {
                hash: v.digest(),
                changed: false,
            },
        );
    }

    Ok((hasher.digest(), count))
}