use std::collections::BTreeMap;
use ipld_core::ipld::Ipld;
use crate::codec::hash_to_cid;
use crate::error::Error;
use crate::id::{Cid, NodeId};
use crate::objects::{
AdjacencyBucket, AdjacencyEntry, Edge, IncomingAdjacencyBucket, IncomingAdjacencyEntry,
IndexSet, Node,
};
use crate::prolly::{self, Cursor, ProllyKey};
use crate::repo::readonly::decode_from_store;
use crate::store::Blockstore;
use super::wrap_index_decode_error;
#[tracing::instrument(
level = "debug",
target = "mnem::index",
skip(bs, nodes_root, edges_root)
)]
pub fn build_index_set<B: Blockstore + ?Sized>(
bs: &B,
nodes_root: &Cid,
edges_root: &Cid,
) -> Result<Cid, Error> {
let mut label_groups: BTreeMap<String, BTreeMap<ProllyKey, Cid>> = BTreeMap::new();
let mut prop_groups: BTreeMap<(String, String), BTreeMap<ProllyKey, Cid>> = BTreeMap::new();
let node_cursor = Cursor::new(bs, nodes_root)?;
for entry in node_cursor {
let (node_key, node_cid) = entry?;
let node: Node = decode_from_store(bs, &node_cid).map_err(|e| {
wrap_index_decode_error(
e,
format!("IndexSet build: decode node at key {node_key:?}"),
&node_cid,
)
})?;
label_groups
.entry(node.ntype.clone())
.or_default()
.insert(node_key, node_cid.clone());
for (prop_name, prop_value) in &node.props {
let hash_key = prop_value_hash(prop_value)?;
prop_groups
.entry((node.ntype.clone(), prop_name.clone()))
.or_default()
.insert(ProllyKey::new(hash_key), node_cid.clone());
}
}
let mut nodes_by_label: BTreeMap<String, Cid> = BTreeMap::new();
for (label, entries) in label_groups {
let root = prolly::build_tree(bs, entries)?;
nodes_by_label.insert(label, root);
}
let mut nodes_by_prop: BTreeMap<String, BTreeMap<String, Cid>> = BTreeMap::new();
for ((label, prop_name), entries) in prop_groups {
let root = prolly::build_tree(bs, entries)?;
nodes_by_prop
.entry(label)
.or_default()
.insert(prop_name, root);
}
let mut outgoing_groups: BTreeMap<NodeId, Vec<AdjacencyEntry>> = BTreeMap::new();
let mut incoming_groups: BTreeMap<NodeId, Vec<IncomingAdjacencyEntry>> = BTreeMap::new();
let edge_cursor = Cursor::new(bs, edges_root)?;
for entry in edge_cursor {
let (_k, edge_cid) = entry?;
let edge: Edge = decode_from_store(bs, &edge_cid)?;
outgoing_groups
.entry(edge.src)
.or_default()
.push(AdjacencyEntry {
label: edge.etype.clone(),
edge: edge_cid.clone(),
});
incoming_groups
.entry(edge.dst)
.or_default()
.push(IncomingAdjacencyEntry {
label: edge.etype,
src: edge.src,
edge: edge_cid,
});
}
let outgoing = if outgoing_groups.is_empty() {
None
} else {
let mut bucket_entries: BTreeMap<ProllyKey, Cid> = BTreeMap::new();
for (src_id, mut edges) in outgoing_groups {
edges.sort_by(|a, b| a.label.cmp(&b.label).then(a.edge.cmp(&b.edge)));
let bucket = AdjacencyBucket {
edges,
extra: BTreeMap::new(),
};
let (bytes, cid) = hash_to_cid(&bucket)?;
bs.put_trusted(cid.clone(), bytes)?;
bucket_entries.insert(ProllyKey::from(src_id), cid);
}
Some(prolly::build_tree(bs, bucket_entries)?)
};
let incoming = if incoming_groups.is_empty() {
None
} else {
let mut bucket_entries: BTreeMap<ProllyKey, Cid> = BTreeMap::new();
for (dst_id, mut edges) in incoming_groups {
edges.sort_by(|a, b| {
a.label
.cmp(&b.label)
.then(a.src.cmp(&b.src))
.then(a.edge.cmp(&b.edge))
});
let bucket = IncomingAdjacencyBucket {
edges,
extra: BTreeMap::new(),
};
let (bytes, cid) = hash_to_cid(&bucket)?;
bs.put_trusted(cid.clone(), bytes)?;
bucket_entries.insert(ProllyKey::from(dst_id), cid);
}
Some(prolly::build_tree(bs, bucket_entries)?)
};
let set = IndexSet {
nodes_by_label,
nodes_by_prop,
outgoing,
incoming,
extra: BTreeMap::new(),
};
let (bytes, cid) = hash_to_cid(&set)?;
bs.put_trusted(cid.clone(), bytes)?;
Ok(cid)
}
#[tracing::instrument(
level = "debug",
target = "mnem::index",
skip(bs, base_indexes_cid, added_nodes),
fields(added_count = added_nodes.len())
)]
pub fn incremental_append_indexes<B: Blockstore + ?Sized>(
bs: &B,
base_indexes_cid: &Cid,
added_nodes: &BTreeMap<NodeId, Cid>,
) -> Result<Cid, Error> {
let base: IndexSet = decode_from_store(bs, base_indexes_cid)?;
let mut label_additions: BTreeMap<String, BTreeMap<ProllyKey, Cid>> = BTreeMap::new();
let mut prop_additions: BTreeMap<(String, String), BTreeMap<ProllyKey, Cid>> = BTreeMap::new();
for (node_id, node_cid) in added_nodes {
let node: Node = decode_from_store(bs, node_cid).map_err(|e| {
wrap_index_decode_error(
e,
format!("incremental_append_indexes: decode added node {node_id:?}"),
node_cid,
)
})?;
let key = ProllyKey::from(*node_id);
label_additions
.entry(node.ntype.clone())
.or_default()
.insert(key, node_cid.clone());
for (prop_name, prop_value) in &node.props {
let hash_key = prop_value_hash(prop_value)?;
prop_additions
.entry((node.ntype.clone(), prop_name.clone()))
.or_default()
.insert(ProllyKey::new(hash_key), node_cid.clone());
}
}
let mut new_nodes_by_label = base.nodes_by_label.clone();
for (label, additions) in label_additions {
let base_sub = base.nodes_by_label.get(&label);
let new_sub = rebuild_subtree_with_additions(bs, base_sub, additions)?;
new_nodes_by_label.insert(label, new_sub);
}
let mut new_nodes_by_prop = base.nodes_by_prop.clone();
for ((label, prop_name), additions) in prop_additions {
let base_sub = base
.nodes_by_prop
.get(&label)
.and_then(|m| m.get(&prop_name));
let new_sub = rebuild_subtree_with_additions(bs, base_sub, additions)?;
new_nodes_by_prop
.entry(label)
.or_default()
.insert(prop_name, new_sub);
}
let new_outgoing = base.outgoing.clone();
let new_incoming = base.incoming.clone();
let set = IndexSet {
nodes_by_label: new_nodes_by_label,
nodes_by_prop: new_nodes_by_prop,
outgoing: new_outgoing,
incoming: new_incoming,
extra: base.extra,
};
let (bytes, cid) = hash_to_cid(&set)?;
bs.put_trusted(cid.clone(), bytes)?;
Ok(cid)
}
fn rebuild_subtree_with_additions<B: Blockstore + ?Sized>(
bs: &B,
base_sub: Option<&Cid>,
additions: BTreeMap<ProllyKey, Cid>,
) -> Result<Cid, Error> {
match base_sub {
None => prolly::build_tree(bs, additions),
Some(root) => {
let mut merged: BTreeMap<ProllyKey, Cid> = BTreeMap::new();
let cursor = Cursor::new(bs, root)?;
for entry in cursor {
let (k, v) = entry?;
merged.insert(k, v);
}
for (k, v) in additions {
merged.insert(k, v);
}
prolly::build_tree(bs, merged)
}
}
}
pub fn prop_value_hash(value: &Ipld) -> Result<[u8; 16], Error> {
struct HasherWriter<'a>(&'a mut blake3::Hasher);
impl std::io::Write for HasherWriter<'_> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.0.update(buf);
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
let mut hasher = blake3::Hasher::new();
serde_ipld_dagcbor::to_writer(HasherWriter(&mut hasher), value)
.map_err(|e| crate::error::CodecError::Encode(e.to_string()))?;
let digest = hasher.finalize();
let mut out = [0u8; 16];
out.copy_from_slice(&digest.as_bytes()[..16]);
Ok(out)
}