infinite-db 0.3.0

A spatial-graph database using n-dimensional curves and hyperedges for engineering logic.
Documentation
//! Concurrent query execution over sealed blocks + live tail.

use std::collections::{BTreeMap, HashSet};
use std::sync::atomic::{AtomicU64, Ordering};

use crate::infinitedb_core::{
    address::{DimensionVector, RevisionId, SpaceId},
    block::{BlockId, Record},
    space::SpaceRegistry,
};
use crate::infinitedb_index::composite::KeyConfig;
use crate::infinitedb_index::key::{hilbert_key_for, hilbert_key_standard};
use crate::infinitedb_storage::nvme::BlockStore;

use crate::infinitedb_core::branch::BranchId;

use super::branch_overlay::BranchOverlayStore;
use super::hilbert_live_tails::HilbertLiveTails;
use super::live_tail::LiveTailView;
use super::snapshot_store::SnapshotStore;
use super::space_live_tails::SpaceLiveTails;

pub fn space_key(spaces: &SpaceRegistry, space: SpaceId, point: &DimensionVector) -> u128 {
    match spaces.get(space) {
        Some(config) => hilbert_key_for(point, KeyConfig {
            bits_per_dim: config.bits_per_dim,
        }),
        None => hilbert_key_standard(point),
    }
}

fn live_tail_for_space(
    space: SpaceId,
    live_tail: Option<&LiveTailView>,
    space_live_tails: Option<&SpaceLiveTails>,
    hilbert_live_tails: Option<&HilbertLiveTails>,
) -> Vec<Record> {
    if let Some(hilbert) = hilbert_live_tails {
        let mut records = Vec::new();
        for tail in hilbert.tails_for_space(space.0) {
            records.extend(tail.snapshot());
        }
        return records
            .into_iter()
            .filter(|r| r.address.space == space)
            .collect();
    }
    if let Some(tails) = space_live_tails {
        return tails
            .get(space.0)
            .map(|t| t.snapshot())
            .unwrap_or_default();
    }
    live_tail
        .map(|t| t.snapshot())
        .unwrap_or_default()
        .into_iter()
        .filter(|r| r.address.space == space)
        .collect()
}

pub fn query_inner(
    store: &BlockStore,
    snapshots: &SnapshotStore,
    live_tail: Option<&LiveTailView>,
    space_live_tails: Option<&SpaceLiveTails>,
    spaces: &SpaceRegistry,
    revision: &AtomicU64,
    space: SpaceId,
    key_range: Option<(u128, u128)>,
    as_of: Option<RevisionId>,
    include_tombstones: bool,
    hilbert_live_tails: Option<&HilbertLiveTails>,
    branch_overlays: Option<&BranchOverlayStore>,
    branch_id: Option<BranchId>,
) -> std::io::Result<Vec<Record>> {
    let rev_ceiling = as_of.unwrap_or_else(|| RevisionId(revision.load(Ordering::Acquire)));

    let mut tail = live_tail_for_space(space, live_tail, space_live_tails, hilbert_live_tails);
    if let (Some(overlays), Some(branch)) = (branch_overlays, branch_id) {
        if branch != BranchId::MAIN {
            tail.extend(overlays.live_records(branch, space));
        }
    }
    let mut tombstoned: HashSet<Vec<u32>> = tail
        .iter()
        .filter(|r| r.address.space == space && r.tombstone && r.revision <= rev_ceiling)
        .map(|r| r.address.point.coords.clone())
        .collect();

    let mut results: Vec<Record> = Vec::new();

    if let Some(snapshot) = snapshots.get(space) {
        let block_ids: Vec<BlockId> = match key_range {
            None => snapshot.blocks.values().map(|e| e.block_id).collect(),
            Some((lo, hi)) => snapshot
                .blocks
                .iter()
                .filter(|(min_key, entry)| **min_key <= hi && entry.max_key >= lo)
                .map(|(_, entry)| entry.block_id)
                .collect(),
        };

        for block_id in &block_ids {
            let block = store.read_block_shared(*block_id)?;
            for record in &block.records {
                if record.address.space == space
                    && record.tombstone
                    && record.revision <= rev_ceiling
                {
                    tombstoned.insert(record.address.point.coords.clone());
                }
            }
        }

        for block_id in block_ids {
            let block = store.read_block_shared(block_id)?;
            for record in block.records.iter() {
                if record.revision > rev_ceiling {
                    continue;
                }
                if !include_tombstones && record.tombstone {
                    continue;
                }
                if let Some((lo, hi)) = key_range {
                    if lo == hi {
                        let k = space_key(spaces, space, &record.address.point);
                        if k != lo {
                            continue;
                        }
                    }
                }
                results.push(record.clone());
            }
        }

        if !include_tombstones {
            results.retain(|r| !tombstoned.contains(&r.address.point.coords));
        }
    }

    for record in tail {
        let visible = record.address.space == space
            && record.revision <= rev_ceiling
            && (include_tombstones || !record.tombstone)
            && (include_tombstones || !tombstoned.contains(&record.address.point.coords));
        if visible {
            if let Some((lo, hi)) = key_range {
                let k = space_key(spaces, space, &record.address.point);
                if k < lo || k > hi {
                    continue;
                }
            }
            results.push(record);
        }
    }

    Ok(results)
}

pub fn query_bbox(
    store: &BlockStore,
    snapshots: &SnapshotStore,
    live_tail: Option<&LiveTailView>,
    space_live_tails: Option<&SpaceLiveTails>,
    spaces: &SpaceRegistry,
    revision: &AtomicU64,
    space: SpaceId,
    min: DimensionVector,
    max: DimensionVector,
    as_of: Option<RevisionId>,
    hilbert_live_tails: Option<&HilbertLiveTails>,
    branch_overlays: Option<&BranchOverlayStore>,
    branch_id: Option<BranchId>,
) -> std::io::Result<Vec<Record>> {
    assert_eq!(min.dims(), max.dims(), "min and max must have equal dimensions");
    let k_min = space_key(spaces, space, &min);
    let k_max = space_key(spaces, space, &max);
    let (lo, hi) = if k_min <= k_max {
        (k_min, k_max)
    } else {
        (k_max, k_min)
    };
    let mut results = query_inner(
        store,
        snapshots,
        live_tail,
        space_live_tails,
        spaces,
        revision,
        space,
        Some((lo, hi)),
        as_of,
        false,
        hilbert_live_tails,
        branch_overlays,
        branch_id,
    )?;
    results.retain(|r| r.address.point.within(&min, &max));
    Ok(results)
}

pub fn snapshots_map_for_persist(snapshots: &SnapshotStore) -> BTreeMap<u64, crate::infinitedb_core::snapshot::Snapshot> {
    snapshots
        .all()
        .into_iter()
        .map(|(k, v)| (k, (*v).clone()))
        .collect()
}