use super::adjacency::load_incoming;
use super::*;
use crate::id::{Cid, EdgeId, NodeId};
use crate::objects::{Edge, IndexSet, Node};
use crate::prolly::Cursor;
use crate::repo::ReadonlyRepo;
use crate::repo::readonly::decode_from_store;
use crate::store::{Blockstore, MemoryBlockstore, MemoryOpHeadsStore, OpHeadsStore};
use ipld_core::ipld::Ipld;
use std::collections::HashSet;
use std::sync::Arc;
fn stores() -> (Arc<dyn Blockstore>, Arc<dyn OpHeadsStore>) {
(
Arc::new(MemoryBlockstore::new()),
Arc::new(MemoryOpHeadsStore::new()),
)
}
#[test]
fn prop_value_hash_is_deterministic() {
let v = Ipld::String("Alice".into());
let a = prop_value_hash(&v).unwrap();
let b = prop_value_hash(&v).unwrap();
assert_eq!(a, b);
}
#[test]
fn prop_value_hash_changes_on_different_values() {
let a = prop_value_hash(&Ipld::String("Alice".into())).unwrap();
let b = prop_value_hash(&Ipld::String("Bob".into())).unwrap();
assert_ne!(a, b);
}
#[test]
fn label_index_returns_only_matching_nodes() {
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs, ohs).unwrap();
let mut tx = repo.start_transaction();
let alice =
Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("Alice".into()));
let doc =
Node::new(NodeId::new_v7(), "Document").with_prop("title", Ipld::String("RFC".into()));
tx.add_node(&alice).unwrap();
tx.add_node(&doc).unwrap();
let repo = tx.commit("t", "seed").unwrap();
let hits = Query::new(&repo).label("Person").execute().unwrap();
assert_eq!(hits.len(), 1);
assert_eq!(hits[0].node.id, alice.id);
}
#[test]
fn prop_eq_index_point_lookup() {
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs, ohs).unwrap();
let mut tx = repo.start_transaction();
for i in 0..50 {
let n = Node::new(NodeId::new_v7(), "Person")
.with_prop("name", Ipld::String(format!("Person{i}")));
tx.add_node(&n).unwrap();
}
let alice =
Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("Alice".into()));
tx.add_node(&alice).unwrap();
let repo = tx.commit("t", "seed").unwrap();
let hits = Query::new(&repo)
.label("Person")
.where_prop("name", PropPredicate::Eq(Ipld::String("Alice".into())))
.execute()
.unwrap();
assert_eq!(hits.len(), 1);
assert_eq!(hits[0].node.id, alice.id);
}
#[test]
fn outgoing_edges_are_returned_when_requested() {
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs, ohs).unwrap();
let mut tx = repo.start_transaction();
let alice =
Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("Alice".into()));
let bob = Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("Bob".into()));
let knows = Edge::new(EdgeId::new_v7(), "knows", alice.id, bob.id);
tx.add_node(&alice).unwrap();
tx.add_node(&bob).unwrap();
tx.add_edge(&knows).unwrap();
let repo = tx.commit("t", "seed").unwrap();
let hits = Query::new(&repo)
.label("Person")
.where_prop("name", PropPredicate::Eq(Ipld::String("Alice".into())))
.with_outgoing("knows")
.execute()
.unwrap();
assert_eq!(hits.len(), 1);
assert_eq!(hits[0].edges.len(), 1);
assert_eq!(hits[0].edges[0].dst, bob.id);
}
#[test]
fn limit_is_respected() {
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs, ohs).unwrap();
let mut tx = repo.start_transaction();
for i in 0..20 {
let n =
Node::new(NodeId::new_v7(), "Person").with_prop("idx", Ipld::Integer(i128::from(i)));
tx.add_node(&n).unwrap();
}
let repo = tx.commit("t", "seed").unwrap();
let hits = repo.query().label("Person").limit(5).execute().unwrap();
assert_eq!(hits.len(), 5);
}
#[test]
fn empty_repo_query_returns_uninitialized() {
use crate::error::RepoError;
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs, ohs).unwrap();
let err = repo.query().label("Person").execute().unwrap_err();
match err {
Error::Repo(RepoError::Uninitialized) => {}
e => panic!("expected Uninitialized, got {e:?}"),
}
}
#[test]
fn where_eq_convenience_is_equivalent_to_where_prop() {
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs, ohs).unwrap();
let mut tx = repo.start_transaction();
let alice =
Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("Alice".into()));
tx.add_node(&alice).unwrap();
let repo = tx.commit("t", "seed").unwrap();
let a = repo
.query()
.label("Person")
.where_eq("name", "Alice")
.execute()
.unwrap();
let b = repo
.query()
.label("Person")
.where_prop("name", PropPredicate::Eq(Ipld::String("Alice".into())))
.execute()
.unwrap();
assert_eq!(a.len(), 1);
assert_eq!(b.len(), 1);
assert_eq!(a[0].node.id, b[0].node.id);
}
#[test]
fn resolve_or_create_pending_hits_in_tx() {
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs, ohs).unwrap();
let mut tx = repo.start_transaction();
let id_a = tx
.resolve_or_create_node("Person", "name", "Alice")
.unwrap();
let id_b = tx
.resolve_or_create_node("Person", "name", "Alice")
.unwrap();
assert_eq!(id_a, id_b, "second resolve hits pending cache");
}
#[test]
fn resolve_or_create_hits_base_commit_index() {
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs, ohs).unwrap();
let mut tx = repo.start_transaction();
let alice =
Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("Alice".into()));
let expected = alice.id;
tx.add_node(&alice).unwrap();
let repo = tx.commit("t", "seed").unwrap();
let mut tx2 = repo.start_transaction();
let resolved = tx2
.resolve_or_create_node("Person", "name", "Alice")
.unwrap();
assert_eq!(resolved, expected, "second tx finds Alice via base index");
}
#[test]
fn resolve_or_create_creates_when_absent() {
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs, ohs).unwrap();
let mut tx = repo.start_transaction();
let id = tx
.resolve_or_create_node("Person", "name", "Carol")
.unwrap();
let repo = tx.commit("t", "seed").unwrap();
let looked_up = repo.lookup_node(&id).unwrap();
let node = looked_up.expect("new Carol node should exist");
assert_eq!(node.ntype, "Person");
assert_eq!(node.props.get("name"), Some(&Ipld::String("Carol".into())));
}
#[test]
fn resolve_or_create_ignores_removed_in_tx() {
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs, ohs).unwrap();
let mut tx = repo.start_transaction();
let alice =
Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("Alice".into()));
let original_id = alice.id;
tx.add_node(&alice).unwrap();
let repo = tx.commit("t", "seed").unwrap();
let mut tx2 = repo.start_transaction();
tx2.remove_node(original_id);
let new_id = tx2
.resolve_or_create_node("Person", "name", "Alice")
.unwrap();
assert_ne!(
new_id, original_id,
"removed node should not satisfy resolve; a fresh one is created"
);
}
#[test]
fn update_then_query_sees_new_value_and_not_old() {
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs, ohs).unwrap();
let mut tx = repo.start_transaction();
let alice_id = NodeId::new_v7();
let v1 = Node::new(alice_id, "Person").with_prop("company", Ipld::String("Acme".into()));
tx.add_node(&v1).unwrap();
let repo = tx.commit("t", "v1").unwrap();
let mut tx2 = repo.start_transaction();
let v2 = Node::new(alice_id, "Person").with_prop("company", Ipld::String("Beta".into()));
tx2.add_node(&v2).unwrap();
let repo = tx2.commit("t", "v2").unwrap();
let at_beta = repo
.query()
.label("Person")
.where_eq("company", "Beta")
.execute()
.unwrap();
assert_eq!(at_beta.len(), 1);
assert_eq!(at_beta[0].node.id, alice_id);
let at_acme = repo
.query()
.label("Person")
.where_eq("company", "Acme")
.execute()
.unwrap();
assert!(at_acme.is_empty(), "old value should no longer be indexed");
}
#[test]
fn remove_then_query_via_label_index() {
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs, ohs).unwrap();
let mut tx = repo.start_transaction();
let alice = Node::new(NodeId::new_v7(), "Person");
let alice_id = alice.id;
tx.add_node(&alice).unwrap();
let repo = tx.commit("t", "seed").unwrap();
let mut tx2 = repo.start_transaction();
tx2.remove_node(alice_id);
let repo = tx2.commit("t", "rm").unwrap();
let hits = repo.query().label("Person").execute().unwrap();
assert!(hits.is_empty(), "label index reflects removal");
}
#[test]
fn prop_index_collision_last_wins() {
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs, ohs).unwrap();
let mut tx = repo.start_transaction();
let a = Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("Alice".into()));
let b = Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("Alice".into()));
tx.add_node(&a).unwrap();
tx.add_node(&b).unwrap();
let repo = tx.commit("t", "seed").unwrap();
let hits = repo
.query()
.label("Person")
.where_eq("name", "Alice")
.execute()
.unwrap();
assert_eq!(hits.len(), 1);
assert!(repo.lookup_node(&a.id).unwrap().is_some());
assert!(repo.lookup_node(&b.id).unwrap().is_some());
}
#[test]
fn index_set_is_deterministic_across_independent_builds() {
fn seed() -> Cid {
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs, ohs).unwrap();
let mut tx = repo.start_transaction();
tx.add_node(
&Node::new(NodeId::from_bytes_raw([1u8; 16]), "Person")
.with_prop("name", Ipld::String("Alice".into())),
)
.unwrap();
tx.add_node(
&Node::new(NodeId::from_bytes_raw([2u8; 16]), "Person")
.with_prop("name", Ipld::String("Bob".into())),
)
.unwrap();
let repo = tx.commit("det", "seed").unwrap();
let commit = repo.head_commit().unwrap();
commit.indexes.clone().expect("indexes present")
}
let i1 = seed();
let i2 = seed();
assert_eq!(
i1, i2,
"IndexSet CID must be byte-identical across independent builds"
);
}
#[test]
fn query_first_returns_one_or_none() {
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs, ohs).unwrap();
let mut tx = repo.start_transaction();
tx.add_node(
&Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("Alice".into())),
)
.unwrap();
let repo = tx.commit("t", "seed").unwrap();
let hit = repo
.query()
.label("Person")
.where_eq("name", "Alice")
.first()
.unwrap();
assert!(hit.is_some());
let miss = repo
.query()
.label("Person")
.where_eq("name", "Nobody")
.first()
.unwrap();
assert!(miss.is_none());
}
#[test]
fn query_one_errors_on_zero_or_many() {
use crate::error::RepoError;
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs, ohs).unwrap();
let mut tx = repo.start_transaction();
for i in 0..3 {
tx.add_node(
&Node::new(NodeId::new_v7(), "Person")
.with_prop("team", Ipld::String("eng".into()))
.with_prop("idx", Ipld::Integer(i128::from(i))),
)
.unwrap();
}
let repo = tx.commit("t", "seed").unwrap();
let err = repo
.query()
.label("Person")
.where_eq("team", "missing")
.one()
.unwrap_err();
assert!(matches!(err, Error::Repo(RepoError::NotFound)));
let err = repo.query().label("Person").one().unwrap_err();
assert!(matches!(err, Error::Repo(RepoError::AmbiguousMatch)));
}
#[test]
fn query_with_no_indexes_falls_back_to_scan() {
use crate::codec::hash_to_cid;
use crate::objects::Commit;
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs.clone(), ohs.clone()).unwrap();
let mut tx = repo.start_transaction();
let alice =
Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("Alice".into()));
tx.add_node(&alice).unwrap();
let repo1 = tx.commit("t", "seed").unwrap();
let head = repo1.head_commit().unwrap();
let mut stripped = head.clone();
stripped.indexes = None;
let (bytes, _cid) = hash_to_cid::<Commit>(&stripped).unwrap();
assert!(!bytes.is_empty());
}
#[test]
fn adjacency_reflects_edge_removal() {
use crate::id::EdgeId;
use crate::objects::Edge;
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs, ohs).unwrap();
let mut tx = repo.start_transaction();
let alice =
Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("Alice".into()));
let bob = Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("Bob".into()));
let e1 = Edge::new(EdgeId::new_v7(), "knows", alice.id, bob.id);
let e1_id = e1.id;
tx.add_node(&alice).unwrap();
tx.add_node(&bob).unwrap();
tx.add_edge(&e1).unwrap();
let repo = tx.commit("t", "seed").unwrap();
let hits = repo
.query()
.label("Person")
.where_eq("name", "Alice")
.with_outgoing("knows")
.execute()
.unwrap();
assert_eq!(hits[0].edges.len(), 1);
let mut tx2 = repo.start_transaction();
tx2.remove_edge(e1_id);
let repo = tx2.commit("t", "rm").unwrap();
let hits = repo
.query()
.label("Person")
.where_eq("name", "Alice")
.with_outgoing("knows")
.execute()
.unwrap();
assert_eq!(
hits[0].edges.len(),
0,
"removed edge no longer appears in the rebuilt adjacency index"
);
}
#[test]
fn prop_value_hash_discriminates_across_all_ipld_variants() {
use crate::id::{CODEC_RAW, Multihash};
let make_cid = |seed: u32| -> ipld_core::cid::Cid {
let ours = Cid::new(CODEC_RAW, Multihash::sha2_256(&seed.to_be_bytes()));
ipld_core::cid::Cid::try_from(ours.to_bytes().as_slice()).unwrap()
};
let samples = vec![
Ipld::Null,
Ipld::Bool(true),
Ipld::Bool(false),
Ipld::Integer(0),
Ipld::Integer(1),
Ipld::Integer(-1),
Ipld::Float(0.0),
Ipld::Float(1.5),
Ipld::String(String::new()),
Ipld::String("a".into()),
Ipld::String("aa".into()),
Ipld::Bytes(vec![]),
Ipld::Bytes(vec![0u8]),
Ipld::Bytes(vec![0u8, 1u8]),
Ipld::List(vec![]),
Ipld::List(vec![Ipld::Integer(1)]),
Ipld::List(vec![Ipld::Integer(1), Ipld::Integer(2)]),
Ipld::Map(std::collections::BTreeMap::from([(
"a".into(),
Ipld::Integer(1),
)])),
Ipld::Map(std::collections::BTreeMap::from([(
"a".into(),
Ipld::Integer(2),
)])),
Ipld::Link(make_cid(1)),
Ipld::Link(make_cid(2)),
];
let hashes: Vec<[u8; 16]> = samples
.iter()
.map(|v| prop_value_hash(v).unwrap())
.collect();
let unique: std::collections::BTreeSet<_> = hashes.iter().collect();
assert_eq!(
unique.len(),
hashes.len(),
"every distinct Ipld value should produce a distinct hash"
);
for (v, h) in samples.iter().zip(hashes.iter()) {
let h2 = prop_value_hash(v).unwrap();
assert_eq!(&h2, h, "hash is deterministic");
}
}
proptest::proptest! {
#[test]
fn prop_value_hash_deterministic_over_ipld(
seed in 0u64..=1_000_000,
) {
let value = Ipld::Map(
[
("k_int".to_string(), Ipld::Integer(i128::from(seed))),
("k_str".to_string(), Ipld::String(format!("v{seed}"))),
(
"k_list".to_string(),
Ipld::List(vec![
Ipld::Integer(i128::from(seed) * 2),
Ipld::String(format!("x{seed}")),
]),
),
]
.into_iter()
.collect::<std::collections::BTreeMap<_, _>>(),
);
let a = prop_value_hash(&value).unwrap();
let b = prop_value_hash(&value).unwrap();
proptest::prop_assert_eq!(a, b, "hash is a pure function of value");
}
}
#[test]
fn node_with_no_outgoing_edges_yields_no_edges() {
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs, ohs).unwrap();
let mut tx = repo.start_transaction();
let loner =
Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("Loner".into()));
tx.add_node(&loner).unwrap();
let repo = tx.commit("t", "seed").unwrap();
let hits = repo
.query()
.label("Person")
.where_eq("name", "Loner")
.with_outgoing("knows")
.execute()
.unwrap();
assert_eq!(hits.len(), 1);
assert!(hits[0].edges.is_empty());
}
#[test]
fn incoming_adjacency_mirrors_outgoing() {
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs, ohs).unwrap();
let mut tx = repo.start_transaction();
let a = Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("A".into()));
let b = Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("B".into()));
let c = Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("C".into()));
let d = Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("D".into()));
tx.add_node(&a).unwrap();
tx.add_node(&b).unwrap();
tx.add_node(&c).unwrap();
tx.add_node(&d).unwrap();
tx.add_edge(&Edge::new(EdgeId::new_v7(), "knows", a.id, b.id))
.unwrap();
tx.add_edge(&Edge::new(EdgeId::new_v7(), "knows", a.id, c.id))
.unwrap();
tx.add_edge(&Edge::new(EdgeId::new_v7(), "knows", d.id, b.id))
.unwrap();
let repo = tx.commit("t", "seed").unwrap();
let hits = repo
.query()
.label("Person")
.where_eq("name", "B")
.with_incoming("knows")
.execute()
.unwrap();
assert_eq!(hits.len(), 1);
assert_eq!(hits[0].incoming_edges.len(), 2);
let mut srcs: Vec<_> = hits[0].incoming_edges.iter().map(|e| e.src).collect();
srcs.sort();
let mut expected = vec![a.id, d.id];
expected.sort();
assert_eq!(srcs, expected);
assert!(hits[0].edges.is_empty());
let hits = repo
.query()
.label("Person")
.where_eq("name", "C")
.with_incoming("knows")
.execute()
.unwrap();
assert_eq!(hits.len(), 1);
assert_eq!(hits[0].incoming_edges.len(), 1);
assert_eq!(hits[0].incoming_edges[0].src, a.id);
}
#[test]
fn incoming_adjacency_is_byte_stable_under_insertion_order() {
fn build(order: &[(u8, u8)]) -> Cid {
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs, ohs).unwrap();
let mut tx = repo.start_transaction();
let mk_node = |b: u8| {
let mut id_bytes = [0u8; 16];
id_bytes[0] = b;
Node::new(NodeId::from_bytes_raw(id_bytes), "Person")
};
for id_b in 1u8..=5u8 {
tx.add_node(&mk_node(id_b)).unwrap();
}
for (s, d) in order {
let mut e_bytes = [0u8; 16];
e_bytes[0] = *s;
e_bytes[1] = *d;
let edge = Edge::new(
EdgeId::from_bytes_raw(e_bytes),
"knows",
mk_node(*s).id,
mk_node(*d).id,
);
tx.add_edge(&edge).unwrap();
}
let repo = tx.commit("t", "seed").unwrap();
let idx_cid = repo.head_commit().unwrap().indexes.clone().unwrap();
let idx: IndexSet = decode_from_store(&*repo.blockstore().clone(), &idx_cid).unwrap();
idx.incoming.expect("incoming tree present")
}
let order_a: Vec<(u8, u8)> = vec![(1, 2), (1, 3), (4, 2), (2, 5), (3, 5)];
let order_b: Vec<(u8, u8)> = vec![(3, 5), (1, 3), (2, 5), (4, 2), (1, 2)];
let a = build(&order_a);
let b = build(&order_b);
assert_eq!(
a, b,
"incoming tree CID must be independent of edge insertion order"
);
}
#[test]
fn fan_in_1000_edges_to_one_node_works() {
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs, ohs).unwrap();
let mut tx = repo.start_transaction();
let celebrity =
Node::new(NodeId::new_v7(), "Celebrity").with_prop("name", Ipld::String("Star".into()));
tx.add_node(&celebrity).unwrap();
for i in 0..1000u32 {
let mut bytes = [0u8; 16];
bytes[..4].copy_from_slice(&i.to_be_bytes());
bytes[4] = 0xAA;
let fan = Node::new(NodeId::from_bytes_raw(bytes), "Fan");
tx.add_node(&fan).unwrap();
tx.add_edge(&Edge::new(
EdgeId::new_v7(),
"follows",
fan.id,
celebrity.id,
))
.unwrap();
}
let repo = tx.commit("t", "seed").unwrap();
let start = std::time::Instant::now();
let hits = repo
.query()
.label("Celebrity")
.where_eq("name", "Star")
.with_incoming("follows")
.execute()
.unwrap();
let elapsed = start.elapsed();
assert_eq!(hits.len(), 1);
assert_eq!(hits[0].incoming_edges.len(), 1000);
assert!(
elapsed.as_millis() < 500,
"fan-in 1000 query took {} ms (>500 ms budget; suggests an accidental O(n^2))",
elapsed.as_millis()
);
}
#[test]
fn self_loop_edge_appears_in_both_outgoing_and_incoming_for_same_node() {
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs, ohs).unwrap();
let mut tx = repo.start_transaction();
let a = Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("A".into()));
let loop_edge = Edge::new(EdgeId::new_v7(), "loves", a.id, a.id);
let loop_id = loop_edge.id;
tx.add_node(&a).unwrap();
tx.add_edge(&loop_edge).unwrap();
let repo = tx.commit("t", "seed").unwrap();
let out_hits = repo
.query()
.label("Person")
.where_eq("name", "A")
.with_outgoing("loves")
.execute()
.unwrap();
assert_eq!(out_hits.len(), 1);
assert_eq!(out_hits[0].edges.len(), 1);
assert_eq!(out_hits[0].edges[0].id, loop_id);
assert!(out_hits[0].incoming_edges.is_empty());
let in_hits = repo
.query()
.label("Person")
.where_eq("name", "A")
.with_incoming("loves")
.execute()
.unwrap();
assert_eq!(in_hits.len(), 1);
assert_eq!(in_hits[0].incoming_edges.len(), 1);
assert_eq!(in_hits[0].incoming_edges[0].id, loop_id);
assert!(in_hits[0].edges.is_empty());
}
#[test]
fn self_loop_deduplicated_in_with_any_direction() {
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs, ohs).unwrap();
let mut tx = repo.start_transaction();
let a = Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("A".into()));
let loop_edge = Edge::new(EdgeId::new_v7(), "loves", a.id, a.id);
let loop_id = loop_edge.id;
tx.add_node(&a).unwrap();
tx.add_edge(&loop_edge).unwrap();
let repo = tx.commit("t", "seed").unwrap();
let hits = repo
.query()
.label("Person")
.where_eq("name", "A")
.with_any_direction("loves")
.execute()
.unwrap();
assert_eq!(hits.len(), 1);
assert_eq!(
hits[0].edges.len(),
1,
"self-loop must appear exactly once in the outgoing list"
);
assert_eq!(hits[0].edges[0].id, loop_id);
assert!(
hits[0].incoming_edges.is_empty(),
"self-loop's incoming twin must be deduplicated"
);
}
#[test]
fn incremental_append_preserves_incoming_cid_byte_equality() {
let ids: Vec<NodeId> = (0u8..6u8)
.map(|i| {
let mut b = [0u8; 16];
b[0] = i;
NodeId::from_bytes_raw(b)
})
.collect();
let edge_defs: Vec<(usize, usize, [u8; 16])> = vec![
(0, 1, {
let mut b = [0u8; 16];
b[15] = 1;
b
}),
(0, 2, {
let mut b = [0u8; 16];
b[15] = 2;
b
}),
(3, 1, {
let mut b = [0u8; 16];
b[15] = 3;
b
}),
];
let extras: Vec<NodeId> = (0..3u8)
.map(|i| {
let mut b = [0u8; 16];
b[0] = 0xEE;
b[1] = i;
NodeId::from_bytes_raw(b)
})
.collect();
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs, ohs).unwrap();
let mut tx = repo.start_transaction();
for id in &ids {
tx.add_node(&Node::new(*id, "Person")).unwrap();
}
for (s, d, eid) in &edge_defs {
tx.add_edge(&Edge::new(
EdgeId::from_bytes_raw(*eid),
"knows",
ids[*s],
ids[*d],
))
.unwrap();
}
let mut repo = tx.commit("t", "seed").unwrap();
for extra in &extras {
let mut tx = repo.start_transaction();
tx.add_node(&Node::new(*extra, "Person")).unwrap();
repo = tx.commit("t", "append").unwrap();
}
let inc_cid = repo.head_commit().unwrap().indexes.clone().unwrap();
let inc_idx: IndexSet = decode_from_store(&*repo.blockstore().clone(), &inc_cid).unwrap();
let inc = inc_idx.incoming.expect("incoming present");
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs, ohs).unwrap();
let mut tx = repo.start_transaction();
for id in ids.iter().chain(extras.iter()) {
tx.add_node(&Node::new(*id, "Person")).unwrap();
}
for (s, d, eid) in &edge_defs {
tx.add_edge(&Edge::new(
EdgeId::from_bytes_raw(*eid),
"knows",
ids[*s],
ids[*d],
))
.unwrap();
}
let repo = tx.commit("t", "one-shot").unwrap();
let full_cid = repo.head_commit().unwrap().indexes.clone().unwrap();
let full_idx: IndexSet = decode_from_store(&*repo.blockstore().clone(), &full_cid).unwrap();
let full = full_idx.incoming.expect("incoming present");
assert_eq!(
inc, full,
"incremental append path must preserve the `incoming` tree CID byte-for-byte"
);
}
#[test]
fn incoming_tree_splits_into_multiple_leaves_at_scale() {
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs, ohs).unwrap();
let mut tx = repo.start_transaction();
let hub = Node::new(NodeId::new_v7(), "Hub");
tx.add_node(&hub).unwrap();
for i in 0..10_000u32 {
let mut b = [0u8; 16];
b[..4].copy_from_slice(&i.to_be_bytes());
b[4] = 0xDD;
let dst = Node::new(NodeId::from_bytes_raw(b), "Leaf");
tx.add_node(&dst).unwrap();
tx.add_edge(&Edge::new(EdgeId::new_v7(), "links", hub.id, dst.id))
.unwrap();
}
let repo = tx.commit("t", "seed").unwrap();
let idx_cid = repo.head_commit().unwrap().indexes.clone().unwrap();
let bs_handle = repo.blockstore().clone();
let idx: IndexSet = decode_from_store(&*bs_handle, &idx_cid).unwrap();
let inc_root = idx.incoming.expect("incoming tree present");
let mut count = 0usize;
let cursor = Cursor::new(&*bs_handle, &inc_root).unwrap();
for e in cursor {
let _ = e.unwrap();
count += 1;
}
assert_eq!(count, 10_000, "all 10k incoming buckets must be reachable");
let root_bytes = bs_handle.get(&inc_root).unwrap().unwrap();
assert!(
root_bytes.len() < 32 * 1024,
"root chunk size {} exceeds 32 KiB max (chunker misconfigured?)",
root_bytes.len()
);
assert!(
root_bytes.len() < 200_000,
"root chunk {} looks like it inlined everything; tree did not split",
root_bytes.len()
);
}
#[test]
fn adjacency_cap_truncates_and_flags() {
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs, ohs).unwrap();
let mut tx = repo.start_transaction();
let target = Node::new(NodeId::new_v7(), "Target");
tx.add_node(&target).unwrap();
for i in 0..100u32 {
let mut b = [0u8; 16];
b[..4].copy_from_slice(&i.to_be_bytes());
let src = Node::new(NodeId::from_bytes_raw(b), "Src");
tx.add_node(&src).unwrap();
tx.add_edge(&Edge::new(EdgeId::new_v7(), "pts", src.id, target.id))
.unwrap();
}
let repo = tx.commit("t", "seed").unwrap();
let hits = repo
.query()
.label("Target")
.with_incoming("pts")
.adjacency_cap(10)
.execute()
.unwrap();
assert_eq!(hits.len(), 1);
assert_eq!(hits[0].incoming_edges.len(), 10);
assert!(hits[0].edges_truncated);
}
#[test]
fn with_incoming_on_pre_0_3_indexset_returns_empty() {
use crate::codec::hash_to_cid;
let (bs, ohs) = stores();
let repo = ReadonlyRepo::init(bs, ohs).unwrap();
let mut tx = repo.start_transaction();
let a = Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("A".into()));
let b = Node::new(NodeId::new_v7(), "Person").with_prop("name", Ipld::String("B".into()));
tx.add_node(&a).unwrap();
tx.add_node(&b).unwrap();
tx.add_edge(&Edge::new(EdgeId::new_v7(), "knows", a.id, b.id))
.unwrap();
let repo = tx.commit("t", "seed").unwrap();
let bs_handle = repo.blockstore().clone();
let cur_commit = repo.head_commit().unwrap();
let idx_cid = cur_commit.indexes.clone().unwrap();
let mut idx: IndexSet = decode_from_store(&*bs_handle, &idx_cid).unwrap();
idx.incoming = None;
let (stripped_bytes, stripped_cid) = hash_to_cid(&idx).unwrap();
bs_handle
.put_trusted(stripped_cid.clone(), stripped_bytes)
.unwrap();
let mut want = HashSet::new();
want.insert("knows");
let idx_stripped: IndexSet = decode_from_store(&*bs_handle, &stripped_cid).unwrap();
let (edges, trunc) = load_incoming(&*bs_handle, Some(&idx_stripped), b.id, &want, 100).unwrap();
assert!(
edges.is_empty(),
"pre-0.3 IndexSet must return empty rather than falling back to a scan"
);
assert!(!trunc);
}