contextdb-graph 0.3.4

Graph executor for contextdb — bounded BFS, DAG enforcement, typed edges
Documentation
use contextdb_core::{Direction, EdgeType, Error, GraphExecutor, RowId, Value};
use contextdb_graph::{GraphStore, MemGraphExecutor};
use contextdb_tx::{TxManager, WriteSet, WriteSetApplicator};
use std::collections::HashMap;
use std::sync::Arc;
use uuid::Uuid;

struct TestStore {
    graph: Arc<GraphStore>,
}

impl WriteSetApplicator for TestStore {
    fn apply(&self, ws: WriteSet) -> contextdb_core::Result<()> {
        self.graph.apply_inserts(ws.adj_inserts);
        self.graph.apply_deletes(ws.adj_deletes);
        Ok(())
    }

    fn new_row_id(&self) -> RowId {
        RowId(1)
    }
}

fn setup() -> (Arc<TxManager<TestStore>>, MemGraphExecutor<TestStore>) {
    let graph = Arc::new(GraphStore::new());
    let tx_mgr = Arc::new(TxManager::new(TestStore {
        graph: graph.clone(),
    }));
    let exec = MemGraphExecutor::new(graph, tx_mgr.clone());
    (tx_mgr, exec)
}

fn connect(
    tx_mgr: &TxManager<TestStore>,
    exec: &MemGraphExecutor<TestStore>,
    source: Uuid,
    target: Uuid,
    edge_type: &str,
) {
    let tx = tx_mgr.begin();
    exec.insert_edge(tx, source, target, edge_type.to_string(), HashMap::new())
        .unwrap();
    tx_mgr.commit(tx).unwrap();
}

#[test]
fn bfs_depth_bound_on_chain() {
    let (tx_mgr, exec) = setup();
    let nodes: Vec<Uuid> = (0..6).map(|_| Uuid::new_v4()).collect();

    for i in 0..5 {
        connect(&tx_mgr, &exec, nodes[i], nodes[i + 1], "BASED_ON");
    }

    let r = exec
        .bfs(nodes[0], None, Direction::Outgoing, 1, 3, tx_mgr.snapshot())
        .unwrap();
    assert_eq!(r.nodes.len(), 3);
}

#[test]
fn bfs_cycle_no_duplicates() {
    let (tx_mgr, exec) = setup();
    let a = Uuid::new_v4();
    let b = Uuid::new_v4();
    let c = Uuid::new_v4();

    connect(&tx_mgr, &exec, a, b, "R");
    connect(&tx_mgr, &exec, b, c, "R");
    connect(&tx_mgr, &exec, c, a, "R");

    let r = exec
        .bfs(a, None, Direction::Outgoing, 1, 5, tx_mgr.snapshot())
        .unwrap();
    assert_eq!(r.nodes.len(), 2);
}

#[test]
fn bfs_respects_mvcc_snapshot() {
    let (tx_mgr, exec) = setup();
    let a = Uuid::new_v4();
    let b = Uuid::new_v4();
    let c = Uuid::new_v4();

    connect(&tx_mgr, &exec, a, b, "R");
    let snap1 = tx_mgr.snapshot();
    connect(&tx_mgr, &exec, b, c, "R");

    let old = exec.bfs(a, None, Direction::Outgoing, 1, 5, snap1).unwrap();
    assert_eq!(old.nodes.len(), 1);

    let new = exec
        .bfs(a, None, Direction::Outgoing, 1, 5, tx_mgr.snapshot())
        .unwrap();
    assert_eq!(new.nodes.len(), 2);
}

#[test]
fn accepts_max_depth_one() {
    let (tx_mgr, exec) = setup();
    let a = Uuid::new_v4();
    let b = Uuid::new_v4();
    let c = Uuid::new_v4();

    connect(&tx_mgr, &exec, a, b, "R");
    connect(&tx_mgr, &exec, b, c, "R");

    let r = exec
        .bfs(a, None, Direction::Outgoing, 1, 1, tx_mgr.snapshot())
        .unwrap();
    assert_eq!(r.nodes.len(), 1);
}

#[test]
fn bfs_visited_limit_error() {
    let (tx_mgr, exec) = setup();
    let root = Uuid::new_v4();

    let tx = tx_mgr.begin();
    for _ in 0..100_001 {
        exec.insert_edge(tx, root, Uuid::new_v4(), "R".to_string(), HashMap::new())
            .unwrap();
    }
    tx_mgr.commit(tx).unwrap();

    let err = exec
        .bfs(root, None, Direction::Outgoing, 1, 1, tx_mgr.snapshot())
        .unwrap_err();
    assert!(matches!(err, Error::BfsVisitedExceeded(100_000)));
}

#[test]
fn direction_incoming_and_both() {
    let (tx_mgr, exec) = setup();
    let a = Uuid::new_v4();
    let b = Uuid::new_v4();
    let c = Uuid::new_v4();

    connect(&tx_mgr, &exec, a, b, "R");
    connect(&tx_mgr, &exec, c, b, "R");

    let incoming = exec
        .neighbors(b, None, Direction::Incoming, tx_mgr.snapshot())
        .unwrap();
    assert_eq!(incoming.len(), 2);

    let both = exec
        .neighbors(b, None, Direction::Both, tx_mgr.snapshot())
        .unwrap();
    assert_eq!(both.len(), 2);
}

#[test]
fn edge_type_filtering_and_empty_graph() {
    let (tx_mgr, exec) = setup();
    let a = Uuid::new_v4();
    let b = Uuid::new_v4();

    let empty = exec
        .bfs(a, None, Direction::Outgoing, 1, 1, tx_mgr.snapshot())
        .unwrap();
    assert!(empty.nodes.is_empty());

    connect(&tx_mgr, &exec, a, b, "BASED_ON");

    let filter: Vec<EdgeType> = vec!["CITES".to_string()];
    let r = exec
        .bfs(
            a,
            Some(&filter),
            Direction::Outgoing,
            1,
            1,
            tx_mgr.snapshot(),
        )
        .unwrap();
    assert!(r.nodes.is_empty());

    let _ = Value::Null;
}