infinite-db 0.3.0

A spatial-graph database using n-dimensional curves and hyperedges for engineering logic.
Documentation
//! Branch-aware replication helpers for [`crate::InfiniteDb`].

use std::collections::HashMap;
use std::io;

use bincode::{config::standard, encode_to_vec};

use crate::infinitedb_core::{
    address::{DimensionVector, RevisionId, SpaceId},
    block::Record,
    branch::BranchId,
    merge::MergeStrategy,
};
use crate::InfiniteDb;

use super::merkle::{self, MerkleTree};

/// Negotiation payload exchanged before overlay transfer.
#[derive(Debug, Clone)]
pub struct BranchSyncState {
    pub branch: BranchId,
    pub merkle_root: [u8; 32],
    pub revision: RevisionId,
}

fn latest_per_address(mut records: Vec<Record>) -> Vec<Record> {
    let mut map: HashMap<Vec<u32>, Record> = HashMap::new();
    for record in records.drain(..) {
        let key = record.address.point.coords.clone();
        map.entry(key)
            .and_modify(|existing| {
                if record.revision > existing.revision {
                    *existing = record.clone();
                }
            })
            .or_insert(record);
    }
    let mut latest: Vec<Record> = map.into_values().collect();
    latest.sort_by_key(|r| (r.address.point.coords.clone(), r.revision.0));
    latest
}

/// Build a Merkle tree over latest visible records on `branch` in `space`.
pub fn snapshot_merkle(
    db: &InfiniteDb,
    space: SpaceId,
    branch: BranchId,
) -> io::Result<MerkleTree> {
    let records = latest_per_address(db.query_on_branch(branch, space, None)?);
    let mut leaves = Vec::with_capacity(records.len());
    for record in &records {
        let encoded = encode_to_vec(
            (&record.address, &record.data, record.tombstone),
            standard(),
        )
        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
        leaves.push(merkle::hash_record(&encoded));
    }
    Ok(MerkleTree::build(&leaves))
}

/// Current branch sync state for wire exchange.
pub fn branch_sync_state(
    db: &InfiniteDb,
    space: SpaceId,
    branch: BranchId,
) -> io::Result<BranchSyncState> {
    Ok(BranchSyncState {
        branch,
        merkle_root: snapshot_merkle(db, space, branch)?.root(),
        revision: RevisionId(db.revision()),
    })
}

/// Copy overlay records from `remote` branch into a new branch on `local`.
pub fn import_branch_overlay(
    local: &InfiniteDb,
    remote: &InfiniteDb,
    remote_branch: BranchId,
    name: &str,
) -> Result<BranchId, String> {
    remote.sync().map_err(|e| e.to_string())?;
    let branch = local.create_branch(name, BranchId::MAIN)?;
    for record in remote.branch_overlays.all_live_records(remote_branch) {
        if record.tombstone {
            local
                .delete_on_branch(branch, record.address.space, record.address.point)
                .map_err(|e| e.to_string())?;
        } else {
            local
                .insert_on_branch(
                    branch,
                    record.address.space,
                    record.address.point,
                    record.data,
                )
                .map_err(|e| e.to_string())?;
        }
    }
    Ok(branch)
}

/// Fast-forward `local` main with records present on `remote` main (no conflict handling).
pub fn converge_main_records(
    local: &InfiniteDb,
    remote: &InfiniteDb,
    space: SpaceId,
) -> io::Result<()> {
    remote.sync()?;
    let remote_records = remote.query(space, None)?;
    let local_records = local.query(space, None)?;
    let mut local_latest: std::collections::HashMap<Vec<u32>, Record> =
        std::collections::HashMap::new();
    for r in local_records {
        let key = r.address.point.coords.clone();
        local_latest
            .entry(key)
            .and_modify(|e| {
                if r.revision > e.revision {
                    *e = r.clone();
                }
            })
            .or_insert(r);
    }
    for record in remote_records {
        let coords = record.address.point.coords.clone();
        let insert = match local_latest.get(&coords) {
            None => true,
            Some(existing) => record.revision > existing.revision,
        };
        if insert {
            local.insert(space, DimensionVector::new(coords), record.data)?;
        }
    }
    local.sync()
}

/// Import a remote feature branch and merge it into local `main`.
pub fn converge_with_branch_merge(
    local: &InfiniteDb,
    remote: &InfiniteDb,
    space: SpaceId,
    remote_branch: BranchId,
    strategy: MergeStrategy,
) -> io::Result<()> {
    converge_main_records(local, remote, space)?;
    let imported = import_branch_overlay(local, remote, remote_branch, "sync-import")
        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
    let result = local.merge_branch(BranchId::MAIN, imported, strategy, None)?;
    if !result.conflicts.is_empty() {
        return Err(io::Error::new(
            io::ErrorKind::Other,
            format!("sync merge left {} conflicts", result.conflicts.len()),
        ));
    }
    local.flush(space)?;
    local.sync()
}