infinite-db 0.3.0

A spatial-graph database using n-dimensional curves and hyperedges for engineering logic.
Documentation
//! Branch merge execution.

use std::collections::{HashMap, HashSet};
use std::io;
use crate::infinitedb_core::{
    address::{Address, SpaceId},
    block::Record,
    branch::{BranchId, BranchRegistry},
    merge::{MergeConflict, MergeResult, MergeStrategy},
};

use super::branch_overlay::BranchOverlayStore;
use super::query::query_inner;
use super::hilbert_live_tails::HilbertLiveTails;
use super::live_tail::LiveTailView;
use super::snapshot_store::SnapshotStore;
use super::space_live_tails::SpaceLiveTails;
use crate::infinitedb_core::space::SpaceRegistry;
use crate::infinitedb_storage::nvme::BlockStore;
use std::sync::atomic::AtomicU64;
type Resolver<'a> = Option<&'a (dyn Fn(MergeConflict) -> Record + Send + Sync)>;

fn latest_per_address(records: Vec<Record>) -> HashMap<Address, Record> {
    let mut map: HashMap<Address, Record> = HashMap::new();
    for record in records {
        map.entry(record.address.clone())
            .and_modify(|existing| {
                if record.revision > existing.revision {
                    *existing = record.clone();
                }
            })
            .or_insert(record);
    }
    map
}

fn records_equivalent(a: &Record, b: &Record) -> bool {
    a.tombstone == b.tombstone && a.data == b.data
}

fn pick_winner(
    conflict: &MergeConflict,
    strategy: MergeStrategy,
    resolver: Resolver<'_>,
) -> Result<Option<Record>, MergeConflict> {
    match strategy {
        MergeStrategy::PreferTarget => Ok(conflict.target.clone()),
        MergeStrategy::PreferSource => Ok(conflict.source.clone()),
        MergeStrategy::PreferHigherRevision => {
            let target_rev = conflict.target.as_ref().map(|r| r.revision.0).unwrap_or(0);
            let source_rev = conflict.source.as_ref().map(|r| r.revision.0).unwrap_or(0);
            if source_rev >= target_rev {
                Ok(conflict.source.clone())
            } else {
                Ok(conflict.target.clone())
            }
        }
        MergeStrategy::Interactive => {
            if let Some(resolve) = resolver {
                Ok(Some(resolve(conflict.clone())))
            } else {
                Err(conflict.clone())
            }
        }
    }
}

/// Three-way merge of `source` into `target`, reading overlay state from `branch_overlays`.
pub fn merge_branches(
    store: &BlockStore,
    snapshots: &SnapshotStore,
    live_tail: Option<&LiveTailView>,
    space_tails: Option<&SpaceLiveTails>,
    hilbert_tails: Option<&HilbertLiveTails>,
    branch_overlays: &BranchOverlayStore,
    spaces: &SpaceRegistry,
    revision: &AtomicU64,
    branches: &BranchRegistry,
    target: BranchId,
    source: BranchId,
    strategy: MergeStrategy,
    resolver: Resolver<'_>,
) -> io::Result<MergeResult> {
    let source_branch = branches
        .get(source)
        .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "source branch not found"))?;
    let forked_at = source_branch.forked_at;

    let source_live = branch_overlays.all_live_records(source);
    let source_by_addr = latest_per_address(source_live);

    let touched_spaces: HashSet<SpaceId> =
        source_by_addr.keys().map(|a| a.space).collect();

    let mut base_by_addr: HashMap<Address, Record> = HashMap::new();
    let mut target_by_addr: HashMap<Address, Record> = HashMap::new();

    for space in &touched_spaces.clone() {
        let base_records = query_inner(
            store,
            snapshots,
            live_tail,
            space_tails,
            spaces,
            revision,
            *space,
            None,
            Some(forked_at),
            true,
            hilbert_tails,
            None,
            None,
        )?;
        for record in base_records {
            if !record.tombstone {
                base_by_addr.insert(record.address.clone(), record);
            }
        }

        let target_records = query_inner(
            store,
            snapshots,
            live_tail,
            space_tails,
            spaces,
            revision,
            *space,
            None,
            None,
            true,
            hilbert_tails,
            if target == BranchId::MAIN {
                None
            } else {
                Some(branch_overlays)
            },
            if target == BranchId::MAIN {
                None
            } else {
                Some(target)
            },
        )?;
        for record in target_records {
            if !record.tombstone {
                target_by_addr.insert(record.address.clone(), record);
            }
        }
    }

    let mut conflicts = Vec::new();
    let mut to_apply: Vec<Record> = Vec::new();
    let mut auto_resolved = 0usize;

    for (address, source_record) in &source_by_addr {
        let base = base_by_addr.get(address).cloned();
        let target_record = target_by_addr.get(address).cloned();

        let source_changed = match (&base, source_record) {
            (None, r) if r.tombstone => false,
            (Some(b), s) => !records_equivalent(b, s),
            (None, _) => true,
        };
        if !source_changed {
            continue;
        }

        let target_changed = match (&base, &target_record) {
            (None, None) => false,
            (Some(b), Some(t)) => !records_equivalent(b, t),
            (None, Some(_)) => true,
            (Some(_), None) => true,
        };

        let diverged = match (&target_record, source_record) {
            (Some(t), s) => !records_equivalent(t, s),
            (None, s) if !s.tombstone => true,
            _ => false,
        };

        if target_changed && diverged {
            let conflict = MergeConflict {
                address: address.clone(),
                base: base.clone(),
                target: target_record.clone(),
                source: Some(source_record.clone()),
            };
            match pick_winner(&conflict, strategy, resolver) {
                Ok(Some(winner)) => {
                    auto_resolved += 1;
                    to_apply.push(winner);
                }
                Ok(None) => {}
                Err(c) => conflicts.push(c),
            }
        } else {
            to_apply.push(source_record.clone());
        }
    }

    if strategy == MergeStrategy::Interactive && !conflicts.is_empty() {
        return Ok(MergeResult {
            merged_records: 0,
            conflicts,
            auto_resolved,
            applied_records: Vec::new(),
        });
    }

    Ok(MergeResult {
        merged_records: to_apply.len(),
        conflicts,
        auto_resolved,
        applied_records: to_apply,
    })
}

/// Index of latest record per address from a slice.
pub fn index_latest(records: &[Record]) -> HashMap<Address, Record> {
    latest_per_address(records.to_vec())
}